Files
tiny-forge/internal/deployer/deployer.go
T
alexei.dolgolyov eef60a4302 feat(docker-watcher): phase 6 - webhook handler
Secret UUID-based webhook endpoint for CI image push notifications.
Project/stage matching via glob patterns, auto-creation of unknown
projects from image inspection. Fix JSON response injection.
2026-03-27 21:56:18 +03:00

484 lines
16 KiB
Go

package deployer
import (
"context"
"encoding/json"
"fmt"
"log"
"sort"
"github.com/alexei/docker-watcher/internal/crypto"
"github.com/alexei/docker-watcher/internal/docker"
"github.com/alexei/docker-watcher/internal/health"
"github.com/alexei/docker-watcher/internal/notify"
"github.com/alexei/docker-watcher/internal/npm"
"github.com/alexei/docker-watcher/internal/store"
"github.com/google/uuid"
)
// Deployer orchestrates the full deployment flow: pull image, create container,
// start, configure proxy, health check, and handle rollback on failure.
// It implements both webhook.DeployTriggerer and registry.DeployTriggerer.
type Deployer struct {
docker *docker.Client
npm *npm.Client
store *store.Store
health *health.Checker
notifier *notify.Notifier
encKey [32]byte
}
// New creates a new Deployer with all required dependencies.
func New(
dockerClient *docker.Client,
npmClient *npm.Client,
st *store.Store,
checker *health.Checker,
notifier *notify.Notifier,
encKey [32]byte,
) *Deployer {
return &Deployer{
docker: dockerClient,
npm: npmClient,
store: st,
health: checker,
notifier: notifier,
encKey: encKey,
}
}
// TriggerDeploy is the main entry point for deployments. It orchestrates the full flow:
// pull image -> create container -> start -> configure proxy -> health check.
// On failure, it rolls back (removes container, deletes proxy host, updates status).
func (d *Deployer) TriggerDeploy(ctx context.Context, projectID, stageID, imageTag string) error {
// Load project and stage from store.
project, err := d.store.GetProjectByID(projectID)
if err != nil {
return fmt.Errorf("get project: %w", err)
}
stage, err := d.store.GetStageByID(stageID)
if err != nil {
return fmt.Errorf("get stage: %w", err)
}
settings, err := d.store.GetSettings()
if err != nil {
return fmt.Errorf("get settings: %w", err)
}
// Create deploy record.
deploy, err := d.store.CreateDeploy(store.Deploy{
ProjectID: projectID,
StageID: stageID,
ImageTag: imageTag,
Status: "pending",
})
if err != nil {
return fmt.Errorf("create deploy record: %w", err)
}
d.logDeploy(deploy.ID, fmt.Sprintf("Starting deploy of %s:%s for project %s, stage %s", project.Image, imageTag, project.Name, stage.Name), "info")
// Enforce max_instances before deploying.
if err := d.enforceMaxInstances(ctx, stage, deploy.ID, settings); err != nil {
d.logDeploy(deploy.ID, fmt.Sprintf("Failed to enforce max instances: %v", err), "error")
// Non-fatal: continue with deploy.
}
// Execute the deploy pipeline. Track state for rollback.
containerID, npmProxyID, instanceID, deployErr := d.executeDeploy(ctx, project, stage, settings, deploy.ID, imageTag)
if deployErr != nil {
d.logDeploy(deploy.ID, fmt.Sprintf("Deploy failed: %v", deployErr), "error")
d.rollback(ctx, deploy.ID, containerID, npmProxyID, instanceID)
d.notifier.Send(settings.NotificationURL, notify.Event{
Type: "deploy_failure",
Project: project.Name,
Stage: stage.Name,
ImageTag: imageTag,
Error: deployErr.Error(),
})
return fmt.Errorf("deploy failed: %w", deployErr)
}
// Mark deploy as successful.
if err := d.store.UpdateDeployStatus(deploy.ID, "success", ""); err != nil {
log.Printf("deployer: update deploy status to success: %v", err)
}
subdomain := d.buildSubdomain(project, stage, settings, imageTag)
fullURL := fmt.Sprintf("https://%s.%s", subdomain, settings.Domain)
d.logDeploy(deploy.ID, fmt.Sprintf("Deploy successful: %s", fullURL), "info")
d.notifier.Send(settings.NotificationURL, notify.Event{
Type: "deploy_success",
Project: project.Name,
Stage: stage.Name,
ImageTag: imageTag,
Subdomain: subdomain,
URL: fullURL,
})
return nil
}
// executeDeploy runs the deploy pipeline steps and returns rollback-relevant state.
// It returns (containerID, npmProxyID, instanceID, error).
func (d *Deployer) executeDeploy(
ctx context.Context,
project store.Project,
stage store.Stage,
settings store.Settings,
deployID string,
imageTag string,
) (string, int, string, error) {
var containerID string
var npmProxyID int
var instanceID string
// Step 1: Pull image.
if err := d.store.UpdateDeployStatus(deployID, "pulling", ""); err != nil {
log.Printf("deployer: update deploy status: %v", err)
}
d.logDeploy(deployID, fmt.Sprintf("Pulling image %s:%s", project.Image, imageTag), "info")
authConfig, err := d.buildRegistryAuth(project)
if err != nil {
return containerID, npmProxyID, instanceID, fmt.Errorf("build registry auth: %w", err)
}
if err := d.docker.PullImage(ctx, project.Image, imageTag, authConfig); err != nil {
return containerID, npmProxyID, instanceID, fmt.Errorf("pull image: %w", err)
}
d.logDeploy(deployID, "Image pulled successfully", "info")
// Step 2: Ensure network exists.
networkID, err := d.docker.EnsureNetwork(ctx, settings.Network)
if err != nil {
return containerID, npmProxyID, instanceID, fmt.Errorf("ensure network: %w", err)
}
d.logDeploy(deployID, fmt.Sprintf("Network %s ready (ID: %s)", settings.Network, truncateID(networkID)), "info")
// Step 3: Create and start container.
if err := d.store.UpdateDeployStatus(deployID, "starting", ""); err != nil {
log.Printf("deployer: update deploy status: %v", err)
}
// Pre-generate instance ID so it can be set as a container label.
instanceID = uuid.New().String()
subdomain := d.buildSubdomain(project, stage, settings, imageTag)
containerName := docker.ContainerName(project.Name, stage.Name, imageTag)
portStr := fmt.Sprintf("%d/tcp", project.Port)
envVars := d.parseEnvVars(project.Env)
containerCfg := docker.ContainerConfig{
Name: containerName,
Image: project.Image + ":" + imageTag,
Env: envVars,
ExposedPorts: []string{portStr},
NetworkName: settings.Network,
NetworkID: networkID,
Project: project.Name,
Stage: stage.Name,
InstanceID: instanceID,
}
d.logDeploy(deployID, fmt.Sprintf("Creating container %s", containerName), "info")
containerID, err = d.docker.CreateContainer(ctx, containerCfg)
if err != nil {
return containerID, npmProxyID, instanceID, fmt.Errorf("create container: %w", err)
}
d.logDeploy(deployID, fmt.Sprintf("Container created (ID: %s)", truncateID(containerID)), "info")
// Create instance record in store with the pre-generated ID.
inst, err := d.store.CreateInstanceWithID(store.Instance{
ID: instanceID,
StageID: stage.ID,
ProjectID: project.ID,
ContainerID: containerID,
ImageTag: imageTag,
Subdomain: subdomain,
Status: "stopped",
Port: project.Port,
})
if err != nil {
return containerID, npmProxyID, instanceID, fmt.Errorf("create instance record: %w", err)
}
instanceID = inst.ID
// Link deploy to instance.
if err := d.store.SetDeployInstanceID(deployID, instanceID); err != nil {
log.Printf("deployer: link deploy to instance: %v", err)
}
d.logDeploy(deployID, fmt.Sprintf("Starting container %s", containerName), "info")
if err := d.docker.StartContainer(ctx, containerID); err != nil {
return containerID, npmProxyID, instanceID, fmt.Errorf("start container: %w", err)
}
if err := d.store.UpdateInstanceStatus(instanceID, "running"); err != nil {
log.Printf("deployer: update instance status to running: %v", err)
}
d.logDeploy(deployID, "Container started", "info")
// Step 4: Configure NPM proxy.
if err := d.store.UpdateDeployStatus(deployID, "configuring_proxy", ""); err != nil {
log.Printf("deployer: update deploy status: %v", err)
}
npmProxyID, err = d.configureProxy(ctx, deployID, settings, containerName, project.Port, subdomain)
if err != nil {
return containerID, npmProxyID, instanceID, fmt.Errorf("configure proxy: %w", err)
}
// Update instance with NPM proxy ID.
inst.NpmProxyID = npmProxyID
inst.Subdomain = subdomain
if err := d.store.UpdateInstance(inst); err != nil {
log.Printf("deployer: update instance with proxy ID: %v", err)
}
// Step 5: Health check.
if project.Healthcheck != "" {
if err := d.store.UpdateDeployStatus(deployID, "health_checking", ""); err != nil {
log.Printf("deployer: update deploy status: %v", err)
}
healthURL := fmt.Sprintf("http://%s:%d%s", containerName, project.Port, project.Healthcheck)
d.logDeploy(deployID, fmt.Sprintf("Running health check: %s", healthURL), "info")
if err := d.health.Check(ctx, healthURL); err != nil {
return containerID, npmProxyID, instanceID, fmt.Errorf("health check: %w", err)
}
d.logDeploy(deployID, "Health check passed", "info")
} else {
d.logDeploy(deployID, "No health check configured, skipping", "info")
}
return containerID, npmProxyID, instanceID, nil
}
// configureProxy creates or updates an NPM proxy host for the deployed container.
// It authenticates to NPM using credentials from settings, then creates the proxy.
// Returns the NPM proxy host ID.
func (d *Deployer) configureProxy(
ctx context.Context,
deployID string,
settings store.Settings,
containerName string,
containerPort int,
subdomain string,
) (int, error) {
// Authenticate to NPM.
npmPassword, err := d.decryptNpmPassword(settings.NpmPassword)
if err != nil {
return 0, fmt.Errorf("decrypt npm password: %w", err)
}
if err := d.npm.Authenticate(ctx, settings.NpmEmail, npmPassword); err != nil {
return 0, fmt.Errorf("authenticate to npm: %w", err)
}
fqdn := subdomain + "." + settings.Domain
d.logDeploy(deployID, fmt.Sprintf("Configuring proxy: %s -> %s:%d", fqdn, containerName, containerPort), "info")
// Check if a proxy host already exists for this domain.
existing, found, err := d.npm.FindProxyHostByDomain(ctx, fqdn)
if err != nil {
return 0, fmt.Errorf("find existing proxy host: %w", err)
}
proxyConfig := npm.ProxyHostConfig{
DomainNames: []string{fqdn},
ForwardScheme: "http",
ForwardHost: containerName,
ForwardPort: containerPort,
BlockExploits: true,
AllowWebsocket: true,
HTTP2Support: true,
Meta: npm.Meta{},
Locations: []any{},
}
if found {
d.logDeploy(deployID, fmt.Sprintf("Updating existing proxy host %d for %s", existing.ID, fqdn), "info")
host, err := d.npm.UpdateProxyHost(ctx, existing.ID, proxyConfig)
if err != nil {
return 0, fmt.Errorf("update proxy host: %w", err)
}
d.logDeploy(deployID, "Proxy host updated", "info")
return host.ID, nil
}
d.logDeploy(deployID, fmt.Sprintf("Creating new proxy host for %s", fqdn), "info")
host, err := d.npm.CreateProxyHost(ctx, proxyConfig)
if err != nil {
return 0, fmt.Errorf("create proxy host: %w", err)
}
d.logDeploy(deployID, fmt.Sprintf("Proxy host created (ID: %d)", host.ID), "info")
return host.ID, nil
}
// enforceMaxInstances removes the oldest instances when the stage has reached its limit.
// This makes room for the new deployment.
func (d *Deployer) enforceMaxInstances(ctx context.Context, stage store.Stage, deployID string, settings store.Settings) error {
if stage.MaxInstances <= 0 {
return nil
}
instances, err := d.store.GetInstancesByStageID(stage.ID)
if err != nil {
return fmt.Errorf("get instances for stage: %w", err)
}
// Filter to running/stopped instances (not already failed/removing).
var active []store.Instance
for _, inst := range instances {
if inst.Status == "running" || inst.Status == "stopped" {
active = append(active, inst)
}
}
// We need room for one more instance, so remove oldest when at limit.
removeCount := len(active) - stage.MaxInstances + 1
if removeCount <= 0 {
return nil
}
// Sort by created_at ascending (oldest first).
sort.Slice(active, func(i, j int) bool {
return active[i].CreatedAt < active[j].CreatedAt
})
for i := 0; i < removeCount && i < len(active); i++ {
inst := active[i]
d.logDeploy(deployID, fmt.Sprintf("Removing oldest instance %s (tag: %s) to enforce max_instances=%d", inst.ID, inst.ImageTag, stage.MaxInstances), "info")
if err := d.removeInstance(ctx, inst, settings); err != nil {
d.logDeploy(deployID, fmt.Sprintf("Failed to remove instance %s: %v", inst.ID, err), "warn")
continue
}
d.logDeploy(deployID, fmt.Sprintf("Removed instance %s", inst.ID), "info")
}
return nil
}
// removeInstance stops and removes a container, deletes its NPM proxy host,
// and removes the instance record from the store.
func (d *Deployer) removeInstance(ctx context.Context, inst store.Instance, settings store.Settings) error {
// Mark as removing.
if err := d.store.UpdateInstanceStatus(inst.ID, "removing"); err != nil {
log.Printf("deployer: update instance %s status to removing: %v", inst.ID, err)
}
// Remove Docker container.
if inst.ContainerID != "" {
if err := d.docker.RemoveContainer(ctx, inst.ContainerID, true); err != nil {
log.Printf("deployer: remove container %s: %v", inst.ContainerID, err)
}
}
// Delete NPM proxy host.
if inst.NpmProxyID > 0 {
npmPassword, err := d.decryptNpmPassword(settings.NpmPassword)
if err == nil {
if authErr := d.npm.Authenticate(ctx, settings.NpmEmail, npmPassword); authErr == nil {
if delErr := d.npm.DeleteProxyHost(ctx, inst.NpmProxyID); delErr != nil {
log.Printf("deployer: delete proxy host %d: %v", inst.NpmProxyID, delErr)
}
}
}
}
// Delete instance record.
if err := d.store.DeleteInstance(inst.ID); err != nil {
return fmt.Errorf("delete instance record: %w", err)
}
return nil
}
// buildSubdomain generates the subdomain for an instance based on settings and stage config.
func (d *Deployer) buildSubdomain(project store.Project, stage store.Stage, settings store.Settings, imageTag string) string {
return GenerateTaggedSubdomain(settings.SubdomainPattern, project.Name, stage.Name, imageTag, stage.Subdomain)
}
// buildRegistryAuth constructs the Docker registry auth string for pulling images.
// If the project has a registry configured, it looks up the registry token.
func (d *Deployer) buildRegistryAuth(project store.Project) (string, error) {
if project.Registry == "" {
return "", nil
}
registries, err := d.store.GetAllRegistries()
if err != nil {
return "", fmt.Errorf("get registries: %w", err)
}
for _, reg := range registries {
if reg.Name == project.Registry {
token := reg.Token
if token != "" {
decrypted, err := crypto.Decrypt(d.encKey, token)
if err != nil {
return "", fmt.Errorf("decrypt registry token: %w", err)
}
return docker.EncodeRegistryAuth(decrypted, decrypted, reg.URL)
}
return "", nil
}
}
return "", nil
}
// decryptNpmPassword decrypts the NPM password from settings.
// Returns empty string if the encrypted password is empty.
func (d *Deployer) decryptNpmPassword(encryptedPassword string) (string, error) {
if encryptedPassword == "" {
return "", nil
}
return crypto.Decrypt(d.encKey, encryptedPassword)
}
// parseEnvVars parses a JSON-encoded map into KEY=VALUE environment variable strings.
func (d *Deployer) parseEnvVars(envJSON string) []string {
if envJSON == "" || envJSON == "{}" {
return nil
}
var envMap map[string]string
if err := json.Unmarshal([]byte(envJSON), &envMap); err != nil {
log.Printf("deployer: parse env vars: %v", err)
return nil
}
vars := make([]string, 0, len(envMap))
for k, v := range envMap {
vars = append(vars, k+"="+v)
}
return vars
}
// logDeploy appends a log entry for a deploy. Errors are logged to stderr but not propagated.
func (d *Deployer) logDeploy(deployID, message, level string) {
if err := d.store.AppendDeployLog(deployID, message, level); err != nil {
log.Printf("deployer: append deploy log: %v", err)
}
}
// truncateID safely truncates a Docker ID to 12 characters for display.
func truncateID(id string) string {
if len(id) > 12 {
return id[:12]
}
return id
}