0405ecd9ce
Build / build (push) Successful in 10m36s
Outgoing notifications were bare POSTs with no auth and no way to verify
they came from Tinyforge. They also went out from one global URL only,
even though stages had a notification_url field, and static-site sync
emitted no events at all.
Schema: add notification_url + notification_secret (lazy-generated) to
settings, projects, stages and static_sites. Migrations are additive.
Notifier: SendSigned computes HMAC-SHA256 over the exact body bytes and
sends X-Hub-Signature-256 (GitHub-compatible — receivers built for
GitHub/Gitea/Forgejo verify out of the box). Aux headers
X-Tinyforge-Event/Delivery/Timestamp/Tier are advisory and not signed.
Empty secret => unsigned send for back-compat.
Resolution: deploys fall through stage > project > settings, sites fall
through site > settings. The secret travels with the URL that sourced
it, so any tier can sign even when its parents are unsigned. Site sync
events now actually emit (site_sync_success / site_sync_failure).
API: 12 new endpoints — {GET secret, POST regenerate, POST disable,
POST test} for each of the 4 tiers. SendSyncForTest returns
status_code/latency_ms/signature_sent/delivery_id/response_snippet so
the UI surfaces receiver feedback inline.
UI: shared OutgoingWebhookPanel.svelte fits the existing card aesthetic.
Signing-state pill, secret reveal-on-demand, regenerate/disable behind
ConfirmDialog modals (not inline strips — too easy to misclick), send-
test result card with colour-coded status. Wired into Settings →
Integrations, project edit form, per-stage edit, and per-site detail.
EN + RU i18n.
Tests: round-trip (sender signs, receiver verifies), tampered-body and
wrong-secret rejection, unsigned-send omits header, send-test surfaces
4xx, concurrent fan-out via Drain. Resolver precedence locked for both
deploy and site paths.
Docs: docs/webhooks.md with header reference, verifier snippets in
Node/Python/Go, and a recipe for the service-to-notification-bridge
generic webhook provider.
852 lines
27 KiB
Go
852 lines
27 KiB
Go
package deployer
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"sort"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/alexei/tinyforge/internal/crypto"
|
|
"github.com/alexei/tinyforge/internal/dns"
|
|
"github.com/alexei/tinyforge/internal/docker"
|
|
"github.com/alexei/tinyforge/internal/events"
|
|
"github.com/alexei/tinyforge/internal/health"
|
|
"github.com/alexei/tinyforge/internal/notify"
|
|
"github.com/alexei/tinyforge/internal/proxy"
|
|
"github.com/alexei/tinyforge/internal/store"
|
|
"github.com/alexei/tinyforge/internal/volume"
|
|
"github.com/moby/moby/api/types/mount"
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// Deployer orchestrates the full deployment flow: pull image, create container,
|
|
// start, configure proxy, health check, and handle rollback on failure.
|
|
// It implements both webhook.DeployTriggerer and registry.DeployTriggerer.
|
|
type Deployer struct {
|
|
docker *docker.Client
|
|
proxy proxy.Provider
|
|
store *store.Store
|
|
health *health.Checker
|
|
notifier *notify.Notifier
|
|
eventBus EventPublisher
|
|
encKey [32]byte
|
|
dnsMu sync.RWMutex
|
|
dns dns.Provider // nil when wildcard DNS is active
|
|
|
|
// Graceful shutdown: tracks in-progress deploys.
|
|
activeWg sync.WaitGroup
|
|
shuttingDown atomic.Bool
|
|
}
|
|
|
|
// EventPublisher is the interface for publishing events to the event bus.
|
|
type EventPublisher interface {
|
|
Publish(evt events.Event)
|
|
}
|
|
|
|
// New creates a new Deployer with all required dependencies.
|
|
func New(
|
|
dockerClient *docker.Client,
|
|
proxyProvider proxy.Provider,
|
|
st *store.Store,
|
|
checker *health.Checker,
|
|
notifier *notify.Notifier,
|
|
eventBus EventPublisher,
|
|
encKey [32]byte,
|
|
) *Deployer {
|
|
return &Deployer{
|
|
docker: dockerClient,
|
|
proxy: proxyProvider,
|
|
store: st,
|
|
health: checker,
|
|
notifier: notifier,
|
|
eventBus: eventBus,
|
|
encKey: encKey,
|
|
}
|
|
}
|
|
|
|
// SetProxyProvider updates the proxy provider at runtime (e.g., when settings change).
|
|
func (d *Deployer) SetProxyProvider(provider proxy.Provider) {
|
|
d.proxy = provider
|
|
}
|
|
|
|
// SetDNSProvider sets the DNS provider for managing DNS records during deployments.
|
|
// Pass nil to disable DNS management (wildcard DNS mode).
|
|
func (d *Deployer) SetDNSProvider(provider dns.Provider) {
|
|
d.dnsMu.Lock()
|
|
defer d.dnsMu.Unlock()
|
|
d.dns = provider
|
|
}
|
|
|
|
// getDNS returns the current DNS provider under read lock.
|
|
func (d *Deployer) getDNS() dns.Provider {
|
|
d.dnsMu.RLock()
|
|
defer d.dnsMu.RUnlock()
|
|
return d.dns
|
|
}
|
|
|
|
// Drain waits for all in-progress deploys to complete. Call this during graceful shutdown.
|
|
func (d *Deployer) Drain() {
|
|
d.shuttingDown.Store(true)
|
|
slog.Info("deployer: draining in-progress deploys")
|
|
d.activeWg.Wait()
|
|
slog.Info("deployer: all deploys drained")
|
|
}
|
|
|
|
// AsyncTriggerDeploy creates a deploy record and returns the deploy ID immediately,
|
|
// then runs the full deploy pipeline in a background goroutine. Use this from HTTP handlers
|
|
// to avoid blocking the request. Progress is streamed via SSE.
|
|
func (d *Deployer) AsyncTriggerDeploy(ctx context.Context, projectID, stageID, imageTag string) (string, error) {
|
|
if d.shuttingDown.Load() {
|
|
return "", fmt.Errorf("deployer is shutting down, rejecting new deploy")
|
|
}
|
|
|
|
// Validate inputs synchronously so the caller gets immediate feedback.
|
|
project, err := d.store.GetProjectByID(projectID)
|
|
if err != nil {
|
|
return "", fmt.Errorf("get project: %w", err)
|
|
}
|
|
stage, err := d.store.GetStageByID(stageID)
|
|
if err != nil {
|
|
return "", fmt.Errorf("get stage: %w", err)
|
|
}
|
|
if err := d.validatePromoteFrom(stage, imageTag); err != nil {
|
|
return "", fmt.Errorf("promote validation: %w", err)
|
|
}
|
|
|
|
// Create deploy record synchronously so caller gets the ID.
|
|
deploy, err := d.store.CreateDeploy(store.Deploy{
|
|
ProjectID: projectID,
|
|
StageID: stageID,
|
|
ImageTag: imageTag,
|
|
Status: "pending",
|
|
})
|
|
if err != nil {
|
|
return "", fmt.Errorf("create deploy record: %w", err)
|
|
}
|
|
|
|
// Run the actual deploy in the background.
|
|
d.activeWg.Add(1)
|
|
go func() {
|
|
defer d.activeWg.Done()
|
|
// Use a detached context so client disconnect doesn't abort the deploy.
|
|
bgCtx := context.Background()
|
|
if err := d.runDeploy(bgCtx, project, stage, deploy.ID, imageTag); err != nil {
|
|
slog.Error("async deploy failed", "deploy_id", deploy.ID, "error", err)
|
|
}
|
|
}()
|
|
|
|
return deploy.ID, nil
|
|
}
|
|
|
|
// runDeploy is the internal deploy pipeline used by AsyncTriggerDeploy.
|
|
// It assumes the deploy record already exists and project/stage are validated.
|
|
func (d *Deployer) runDeploy(ctx context.Context, project store.Project, stage store.Stage, deployID string, imageTag string) error {
|
|
settings, err := d.store.GetSettings()
|
|
if err != nil {
|
|
if updateErr := d.store.UpdateDeployStatus(deployID, "failed", err.Error()); updateErr != nil {
|
|
slog.Warn("update deploy status", "error", updateErr)
|
|
}
|
|
return fmt.Errorf("get settings: %w", err)
|
|
}
|
|
|
|
slog.Info("starting deploy",
|
|
"deploy_id", deployID,
|
|
"project", project.Name,
|
|
"stage", stage.Name,
|
|
"tag", imageTag,
|
|
)
|
|
d.logDeploy(deployID, fmt.Sprintf("Starting deploy of %s:%s for project %s, stage %s", project.Image, imageTag, project.Name, stage.Name), "info")
|
|
|
|
// Enforce max_instances before deploying.
|
|
if err := d.enforceMaxInstances(ctx, stage, deployID, settings); err != nil {
|
|
d.logDeploy(deployID, fmt.Sprintf("Failed to enforce max instances: %v", err), "error")
|
|
}
|
|
|
|
var containerID string
|
|
var proxyRouteID string
|
|
var instanceID string
|
|
var deployErr error
|
|
|
|
if stage.MaxInstances == 1 {
|
|
containerID, proxyRouteID, instanceID, deployErr = d.blueGreenDeploy(ctx, project, stage, settings, deployID, imageTag)
|
|
} else {
|
|
containerID, proxyRouteID, instanceID, deployErr = d.executeDeploy(ctx, project, stage, settings, deployID, imageTag)
|
|
}
|
|
|
|
if deployErr != nil {
|
|
d.logDeploy(deployID, fmt.Sprintf("Deploy failed: %v", deployErr), "error")
|
|
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "failed", deployErr.Error())
|
|
d.rollback(ctx, deployID, containerID, proxyRouteID, instanceID)
|
|
|
|
url, secret, tier := resolveDeployTarget(stage, project, settings)
|
|
d.notifier.SendSigned(url, secret, tier, notify.Event{
|
|
Type: "deploy_failure",
|
|
Project: project.Name,
|
|
Stage: stage.Name,
|
|
ImageTag: imageTag,
|
|
Error: deployErr.Error(),
|
|
})
|
|
|
|
return fmt.Errorf("deploy failed: %w", deployErr)
|
|
}
|
|
|
|
if err := d.store.UpdateDeployStatus(deployID, "success", ""); err != nil {
|
|
slog.Warn("update deploy status to success", "error", err)
|
|
}
|
|
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "success", "")
|
|
|
|
subdomain := d.buildSubdomain(project, stage, settings, imageTag)
|
|
fullURL := fmt.Sprintf("https://%s.%s", subdomain, settings.Domain)
|
|
|
|
d.logDeploy(deployID, fmt.Sprintf("Deploy successful: %s", fullURL), "info")
|
|
|
|
url, secret, tier := resolveDeployTarget(stage, project, settings)
|
|
d.notifier.SendSigned(url, secret, tier, notify.Event{
|
|
Type: "deploy_success",
|
|
Project: project.Name,
|
|
Stage: stage.Name,
|
|
ImageTag: imageTag,
|
|
Subdomain: subdomain,
|
|
URL: fullURL,
|
|
})
|
|
|
|
return nil
|
|
}
|
|
|
|
// resolveDeployTarget picks the most-specific (URL, secret, tier) for a
|
|
// deploy notification: stage > project > global. An empty URL at a tier
|
|
// means "fall through to the next" — never "send unsigned to nowhere". The
|
|
// secret is always paired with the URL that sourced it, so a stage can sign
|
|
// even when project and global are unsigned (and vice versa).
|
|
func resolveDeployTarget(stage store.Stage, project store.Project, settings store.Settings) (string, string, notify.Tier) {
|
|
if stage.NotificationURL != "" {
|
|
return stage.NotificationURL, stage.NotificationSecret, notify.TierStage
|
|
}
|
|
if project.NotificationURL != "" {
|
|
return project.NotificationURL, project.NotificationSecret, notify.TierProject
|
|
}
|
|
return settings.NotificationURL, settings.NotificationSecret, notify.TierSettings
|
|
}
|
|
|
|
// TriggerDeploy is the synchronous entry point for deployments (used by poller and webhook).
|
|
// It validates inputs, creates a deploy record, and delegates to runDeploy.
|
|
func (d *Deployer) TriggerDeploy(ctx context.Context, projectID, stageID, imageTag string) error {
|
|
if d.shuttingDown.Load() {
|
|
return fmt.Errorf("deployer is shutting down, rejecting new deploy")
|
|
}
|
|
|
|
d.activeWg.Add(1)
|
|
defer d.activeWg.Done()
|
|
|
|
project, err := d.store.GetProjectByID(projectID)
|
|
if err != nil {
|
|
return fmt.Errorf("get project: %w", err)
|
|
}
|
|
|
|
stage, err := d.store.GetStageByID(stageID)
|
|
if err != nil {
|
|
return fmt.Errorf("get stage: %w", err)
|
|
}
|
|
|
|
if err := d.validatePromoteFrom(stage, imageTag); err != nil {
|
|
return fmt.Errorf("promote validation: %w", err)
|
|
}
|
|
|
|
deploy, err := d.store.CreateDeploy(store.Deploy{
|
|
ProjectID: projectID,
|
|
StageID: stageID,
|
|
ImageTag: imageTag,
|
|
Status: "pending",
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("create deploy record: %w", err)
|
|
}
|
|
|
|
if err := d.runDeploy(ctx, project, stage, deploy.ID, imageTag); err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// executeDeploy runs the deploy pipeline steps and returns rollback-relevant state.
|
|
// It returns (containerID, proxyRouteID, instanceID, error).
|
|
func (d *Deployer) executeDeploy(
|
|
ctx context.Context,
|
|
project store.Project,
|
|
stage store.Stage,
|
|
settings store.Settings,
|
|
deployID string,
|
|
imageTag string,
|
|
) (string, string, string, error) {
|
|
var containerID string
|
|
var proxyRouteID string
|
|
var instanceID string
|
|
|
|
// Step 1: Pull image.
|
|
if err := d.store.UpdateDeployStatus(deployID, "pulling", ""); err != nil {
|
|
slog.Warn("update deploy status", "error", err)
|
|
}
|
|
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "pulling", "")
|
|
d.logDeploy(deployID, fmt.Sprintf("Pulling image %s:%s", project.Image, imageTag), "info")
|
|
|
|
authConfig, err := d.buildRegistryAuth(project)
|
|
if err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("build registry auth: %w", err)
|
|
}
|
|
|
|
if err := d.docker.PullImage(ctx, project.Image, imageTag, authConfig); err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("pull image: %w", err)
|
|
}
|
|
d.logDeploy(deployID, "Image pulled successfully", "info")
|
|
|
|
// Step 2: Ensure network exists.
|
|
if settings.Network == "" {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("docker network not configured in settings")
|
|
}
|
|
networkID, err := d.docker.EnsureNetwork(ctx, settings.Network)
|
|
if err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("ensure network: %w", err)
|
|
}
|
|
d.logDeploy(deployID, fmt.Sprintf("Network %s ready (ID: %s)", settings.Network, truncateID(networkID)), "info")
|
|
|
|
// Step 3: Create and start container.
|
|
if err := d.store.UpdateDeployStatus(deployID, "starting", ""); err != nil {
|
|
slog.Warn("update deploy status", "error", err)
|
|
}
|
|
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "starting", "")
|
|
|
|
// Pre-generate instance ID so it can be set as a container label.
|
|
instanceID = uuid.New().String()
|
|
subdomain := d.buildSubdomain(project, stage, settings, imageTag)
|
|
|
|
containerName := docker.ContainerName(project.Name, stage.Name, imageTag)
|
|
|
|
// Remove any stale container with the same name (e.g., from a previous failed deploy).
|
|
_ = d.docker.RemoveContainer(ctx, containerName, true)
|
|
|
|
portStr := fmt.Sprintf("%d/tcp", project.Port)
|
|
envVars := d.mergeEnvVars(project, stage.ID)
|
|
mounts := d.computeVolumeMounts(project.ID, project.Name, stage.Name, imageTag, settings.BaseVolumePath)
|
|
|
|
containerCfg := docker.ContainerConfig{
|
|
Name: containerName,
|
|
Image: project.Image + ":" + imageTag,
|
|
Env: envVars,
|
|
ExposedPorts: []string{portStr},
|
|
NetworkName: settings.Network,
|
|
NetworkID: networkID,
|
|
Project: project.Name,
|
|
Stage: stage.Name,
|
|
InstanceID: instanceID,
|
|
Mounts: mounts,
|
|
CpuLimit: stage.CpuLimit,
|
|
MemoryLimit: stage.MemoryLimit,
|
|
}
|
|
|
|
// Set proxy labels for providers that use Docker labels (e.g., Traefik).
|
|
if stage.EnableProxy {
|
|
fqdn := subdomain + "." + settings.Domain
|
|
if proxyLabels := d.proxy.ContainerLabels(fqdn, project.Port); proxyLabels != nil {
|
|
if containerCfg.Labels == nil {
|
|
containerCfg.Labels = make(map[string]string)
|
|
}
|
|
for k, v := range proxyLabels {
|
|
containerCfg.Labels[k] = v
|
|
}
|
|
}
|
|
}
|
|
|
|
d.logDeploy(deployID, fmt.Sprintf("Creating container %s", containerName), "info")
|
|
containerID, err = d.docker.CreateContainer(ctx, containerCfg)
|
|
if err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("create container: %w", err)
|
|
}
|
|
d.logDeploy(deployID, fmt.Sprintf("Container created (ID: %s)", truncateID(containerID)), "info")
|
|
|
|
// Create instance record in store with the pre-generated ID.
|
|
inst, err := d.store.CreateInstanceWithID(store.Instance{
|
|
ID: instanceID,
|
|
StageID: stage.ID,
|
|
ProjectID: project.ID,
|
|
ContainerID: containerID,
|
|
ImageTag: imageTag,
|
|
Subdomain: subdomain,
|
|
Status: "stopped",
|
|
Port: project.Port,
|
|
})
|
|
if err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("create instance record: %w", err)
|
|
}
|
|
instanceID = inst.ID
|
|
|
|
// Link deploy to instance.
|
|
if err := d.store.SetDeployInstanceID(deployID, instanceID); err != nil {
|
|
slog.Warn("link deploy to instance", "error", err)
|
|
}
|
|
|
|
d.logDeploy(deployID, fmt.Sprintf("Starting container %s", containerName), "info")
|
|
if err := d.docker.StartContainer(ctx, containerID); err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("start container: %w", err)
|
|
}
|
|
|
|
if err := d.store.UpdateInstanceStatus(instanceID, "running"); err != nil {
|
|
slog.Warn("update instance status to running", "error", err)
|
|
}
|
|
if err := d.store.UpdateLastAliveAt(instanceID); err != nil {
|
|
slog.Warn("update last_alive_at on deploy", "instance_id", instanceID, "error", err)
|
|
}
|
|
d.publishInstanceStatus(instanceID, project.ID, stage.ID, "running")
|
|
d.logDeploy(deployID, "Container started", "info")
|
|
|
|
// Step 4: Configure NPM proxy (optional per stage).
|
|
if stage.EnableProxy {
|
|
if err := d.store.UpdateDeployStatus(deployID, "configuring_proxy", ""); err != nil {
|
|
slog.Warn("update deploy status", "error", err)
|
|
}
|
|
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "configuring_proxy", "")
|
|
|
|
accessListID := settings.NpmAccessListID
|
|
if project.NpmAccessListID > 0 {
|
|
accessListID = project.NpmAccessListID
|
|
}
|
|
|
|
proxyRouteID, err = d.configureProxy(ctx, deployID, settings, containerID, containerName, project.Port, subdomain, accessListID)
|
|
if err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("configure proxy: %w", err)
|
|
}
|
|
|
|
// Update instance with proxy route ID.
|
|
inst.ProxyRouteID = proxyRouteID
|
|
inst.Subdomain = subdomain
|
|
if err := d.store.UpdateInstance(inst); err != nil {
|
|
slog.Warn("update instance with proxy ID", "error", err)
|
|
}
|
|
|
|
// Create DNS record for this instance.
|
|
fqdn := subdomain + "." + settings.Domain
|
|
d.ensureDNS(ctx, fqdn, "instance", instanceID, deployID)
|
|
} else {
|
|
d.logDeploy(deployID, "Proxy creation skipped (disabled for this stage)", "info")
|
|
inst.Subdomain = subdomain
|
|
if err := d.store.UpdateInstance(inst); err != nil {
|
|
slog.Warn("update instance", "error", err)
|
|
}
|
|
}
|
|
|
|
// Step 5: Health check.
|
|
if project.Healthcheck != "" {
|
|
if err := d.store.UpdateDeployStatus(deployID, "health_checking", ""); err != nil {
|
|
slog.Warn("update deploy status", "error", err)
|
|
}
|
|
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "health_checking", "")
|
|
|
|
healthURL := fmt.Sprintf("http://%s:%d%s", containerName, project.Port, project.Healthcheck)
|
|
d.logDeploy(deployID, fmt.Sprintf("Running health check: %s", healthURL), "info")
|
|
|
|
if err := d.health.Check(ctx, healthURL); err != nil {
|
|
return containerID, proxyRouteID, instanceID, fmt.Errorf("health check: %w", err)
|
|
}
|
|
d.logDeploy(deployID, "Health check passed", "info")
|
|
} else {
|
|
d.logDeploy(deployID, "No health check configured, skipping", "info")
|
|
}
|
|
|
|
return containerID, proxyRouteID, instanceID, nil
|
|
}
|
|
|
|
// configureProxy creates or updates a proxy route for the deployed container.
|
|
// Uses the configured proxy.Provider (NPM, Traefik, or None).
|
|
// In NPM remote mode, uses server_ip + published host port instead of container name.
|
|
// Returns the proxy route ID string.
|
|
func (d *Deployer) configureProxy(
|
|
ctx context.Context,
|
|
deployID string,
|
|
settings store.Settings,
|
|
containerID string,
|
|
containerName string,
|
|
containerPort int,
|
|
subdomain string,
|
|
accessListID int,
|
|
) (string, error) {
|
|
fqdn := subdomain + "." + settings.Domain
|
|
|
|
forwardHost := containerName
|
|
forwardPort := containerPort
|
|
|
|
// In NPM remote mode, use server_ip and the published host port.
|
|
if settings.NpmRemote && settings.ProxyProvider == "npm" {
|
|
if settings.ServerIP == "" {
|
|
return "", fmt.Errorf("NPM remote mode requires Server IP to be configured in settings")
|
|
}
|
|
forwardHost = settings.ServerIP
|
|
|
|
hostPort, err := d.docker.InspectContainerPort(ctx, containerID, fmt.Sprintf("%d/tcp", containerPort))
|
|
if err != nil {
|
|
return "", fmt.Errorf("look up host port for remote NPM: %w", err)
|
|
}
|
|
forwardPort = int(hostPort)
|
|
d.logDeploy(deployID, fmt.Sprintf("NPM remote mode: using %s:%d (host port)", forwardHost, forwardPort), "info")
|
|
}
|
|
|
|
d.logDeploy(deployID, fmt.Sprintf("Configuring proxy (%s): %s -> %s:%d", d.proxy.Name(), fqdn, forwardHost, forwardPort), "info")
|
|
|
|
routeID, err := d.proxy.ConfigureRoute(ctx, fqdn, forwardHost, forwardPort, proxy.RouteOptions{
|
|
SSLCertificateID: settings.SSLCertificateID,
|
|
AccessListID: accessListID,
|
|
})
|
|
if err != nil {
|
|
return "", fmt.Errorf("configure proxy route: %w", err)
|
|
}
|
|
|
|
if routeID != "" {
|
|
d.logDeploy(deployID, fmt.Sprintf("Proxy route configured (ID: %s)", routeID), "info")
|
|
}
|
|
return routeID, nil
|
|
}
|
|
|
|
// enforceMaxInstances removes the oldest instances when the stage has reached its limit.
|
|
// This makes room for the new deployment.
|
|
func (d *Deployer) enforceMaxInstances(ctx context.Context, stage store.Stage, deployID string, settings store.Settings) error {
|
|
if stage.MaxInstances <= 0 {
|
|
return nil
|
|
}
|
|
|
|
instances, err := d.store.GetInstancesByStageID(stage.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("get instances for stage: %w", err)
|
|
}
|
|
|
|
// Filter to running/stopped instances (not already failed/removing).
|
|
var active []store.Instance
|
|
for _, inst := range instances {
|
|
if inst.Status == "running" || inst.Status == "stopped" {
|
|
active = append(active, inst)
|
|
}
|
|
}
|
|
|
|
// We need room for one more instance, so remove oldest when at limit.
|
|
removeCount := len(active) - stage.MaxInstances + 1
|
|
if removeCount <= 0 {
|
|
return nil
|
|
}
|
|
|
|
// Sort by created_at ascending (oldest first).
|
|
sort.Slice(active, func(i, j int) bool {
|
|
return active[i].CreatedAt < active[j].CreatedAt
|
|
})
|
|
|
|
for i := 0; i < removeCount && i < len(active); i++ {
|
|
inst := active[i]
|
|
d.logDeploy(deployID, fmt.Sprintf("Removing oldest instance %s (tag: %s) to enforce max_instances=%d", inst.ID, inst.ImageTag, stage.MaxInstances), "info")
|
|
|
|
if err := d.removeInstance(ctx, inst, settings); err != nil {
|
|
d.logDeploy(deployID, fmt.Sprintf("Failed to remove instance %s: %v", inst.ID, err), "warn")
|
|
continue
|
|
}
|
|
d.logDeploy(deployID, fmt.Sprintf("Removed instance %s", inst.ID), "info")
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// removeInstance stops and removes a container, deletes its NPM proxy host,
|
|
// and removes the instance record from the store.
|
|
func (d *Deployer) removeInstance(ctx context.Context, inst store.Instance, settings store.Settings) error {
|
|
// Mark as removing.
|
|
if err := d.store.UpdateInstanceStatus(inst.ID, "removing"); err != nil {
|
|
slog.Warn("update instance status to removing", "instance_id", inst.ID, "error", err)
|
|
}
|
|
|
|
// Remove Docker container.
|
|
if inst.ContainerID != "" {
|
|
if err := d.docker.RemoveContainer(ctx, inst.ContainerID, true); err != nil {
|
|
slog.Warn("remove container", "container_id", inst.ContainerID, "error", err)
|
|
}
|
|
}
|
|
|
|
// Delete proxy route.
|
|
if inst.ProxyRouteID != "" {
|
|
if err := d.proxy.DeleteRoute(ctx, inst.ProxyRouteID); err != nil {
|
|
slog.Warn("delete proxy route", "route_id", inst.ProxyRouteID, "error", err)
|
|
}
|
|
|
|
// Remove DNS record for this instance.
|
|
if inst.Subdomain != "" && settings.Domain != "" {
|
|
fqdn := inst.Subdomain + "." + settings.Domain
|
|
d.removeDNS(ctx, fqdn, "")
|
|
}
|
|
}
|
|
|
|
// Delete instance record.
|
|
if err := d.store.DeleteInstance(inst.ID); err != nil {
|
|
return fmt.Errorf("delete instance record: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// buildSubdomain generates the subdomain for an instance based on settings and stage config.
|
|
func (d *Deployer) buildSubdomain(project store.Project, stage store.Stage, settings store.Settings, imageTag string) string {
|
|
return GenerateTaggedSubdomain(settings.SubdomainPattern, project.Name, stage.Name, imageTag, stage.Subdomain)
|
|
}
|
|
|
|
// buildRegistryAuth constructs the Docker registry auth string for pulling images.
|
|
// If the project has a registry configured, it looks up the registry token.
|
|
func (d *Deployer) buildRegistryAuth(project store.Project) (string, error) {
|
|
if project.Registry == "" {
|
|
return "", nil
|
|
}
|
|
|
|
reg, err := d.store.GetRegistryByName(project.Registry)
|
|
if err != nil {
|
|
return "", fmt.Errorf("get registry %s: %w", project.Registry, err)
|
|
}
|
|
|
|
if reg.Token != "" {
|
|
decrypted, err := crypto.Decrypt(d.encKey, reg.Token)
|
|
if err != nil {
|
|
return "", fmt.Errorf("decrypt registry token: %w", err)
|
|
}
|
|
return docker.EncodeRegistryAuth(decrypted, decrypted, reg.URL)
|
|
}
|
|
return "", nil
|
|
}
|
|
|
|
// mergeEnvVars builds the final environment variable list for a container:
|
|
// 1. Parse project-level env JSON
|
|
// 2. Overlay with stage-level env overrides (stage wins on key conflict)
|
|
// 3. Decrypt any encrypted (secret) values
|
|
// Returns a []string of KEY=VALUE pairs.
|
|
func (d *Deployer) mergeEnvVars(project store.Project, stageID string) []string {
|
|
// Step 1: Parse project-level env.
|
|
envMap := make(map[string]string)
|
|
if project.Env != "" && project.Env != "{}" {
|
|
var projectEnv map[string]string
|
|
if err := json.Unmarshal([]byte(project.Env), &projectEnv); err != nil {
|
|
slog.Warn("parse project env vars", "error", err)
|
|
} else {
|
|
for k, v := range projectEnv {
|
|
envMap[k] = v
|
|
}
|
|
}
|
|
}
|
|
|
|
// Step 2: Overlay with stage-level overrides.
|
|
stageEnvs, err := d.store.GetStageEnvByStageID(stageID)
|
|
if err != nil {
|
|
slog.Warn("get stage env overrides", "stage_id", stageID, "error", err)
|
|
} else {
|
|
for _, se := range stageEnvs {
|
|
value := se.Value
|
|
if se.Encrypted {
|
|
// Step 3: Decrypt secret values.
|
|
decrypted, err := crypto.Decrypt(d.encKey, se.Value)
|
|
if err != nil {
|
|
slog.Warn("decrypt stage env value", "key", se.Key, "error", err)
|
|
continue
|
|
}
|
|
value = decrypted
|
|
}
|
|
envMap[se.Key] = value
|
|
}
|
|
}
|
|
|
|
vars := make([]string, 0, len(envMap))
|
|
for k, v := range envMap {
|
|
vars = append(vars, k+"="+v)
|
|
}
|
|
return vars
|
|
}
|
|
|
|
// computeVolumeMounts builds Docker mount specifications from the project's volume config.
|
|
// Uses the shared volume.ResolvePath for path resolution.
|
|
func (d *Deployer) computeVolumeMounts(projectID, projectName, stageName, imageTag, basePath string) []mount.Mount {
|
|
vols, err := d.store.GetVolumesByProjectID(projectID)
|
|
if err != nil {
|
|
slog.Warn("get project volumes", "project_id", projectID, "error", err)
|
|
return nil
|
|
}
|
|
|
|
if len(vols) == 0 {
|
|
return nil
|
|
}
|
|
|
|
params := volume.ResolveParams{
|
|
BasePath: basePath,
|
|
ProjectName: projectName,
|
|
StageName: stageName,
|
|
ImageTag: imageTag,
|
|
}
|
|
|
|
mounts := make([]mount.Mount, 0, len(vols))
|
|
for _, vol := range vols {
|
|
scope := vol.Scope
|
|
if scope == "" {
|
|
switch vol.Mode {
|
|
case "isolated":
|
|
scope = "instance"
|
|
default:
|
|
scope = "project"
|
|
}
|
|
}
|
|
|
|
// Ephemeral volumes use tmpfs — no host path.
|
|
if scope == "ephemeral" {
|
|
mounts = append(mounts, mount.Mount{
|
|
Type: mount.TypeTmpfs,
|
|
Target: vol.Target,
|
|
})
|
|
continue
|
|
}
|
|
|
|
source, err := volume.ResolvePath(vol, params)
|
|
if err != nil {
|
|
slog.Warn("resolve volume path", "volume_id", vol.ID, "error", err)
|
|
continue
|
|
}
|
|
|
|
mounts = append(mounts, mount.Mount{
|
|
Type: mount.TypeBind,
|
|
Source: source,
|
|
Target: vol.Target,
|
|
})
|
|
}
|
|
return mounts
|
|
}
|
|
|
|
// logDeploy appends a log entry for a deploy and publishes it on the event bus.
|
|
// Errors are logged to stderr but not propagated.
|
|
func (d *Deployer) logDeploy(deployID, message, level string) {
|
|
if err := d.store.AppendDeployLog(deployID, message, level); err != nil {
|
|
slog.Warn("append deploy log", "error", err)
|
|
}
|
|
if d.eventBus != nil {
|
|
d.eventBus.Publish(events.Event{
|
|
Type: events.EventDeployLog,
|
|
Payload: events.DeployLogPayload{
|
|
DeployID: deployID,
|
|
Message: message,
|
|
Level: level,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
// publishDeployStatus publishes a deploy status change event on the bus.
|
|
func (d *Deployer) publishDeployStatus(deployID, projectID, stageID, imageTag, status, deployErr string) {
|
|
if d.eventBus != nil {
|
|
d.eventBus.Publish(events.Event{
|
|
Type: events.EventDeployStatus,
|
|
Payload: events.DeployStatusPayload{
|
|
DeployID: deployID,
|
|
ProjectID: projectID,
|
|
StageID: stageID,
|
|
ImageTag: imageTag,
|
|
Status: status,
|
|
Error: deployErr,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
// publishInstanceStatus publishes an instance status change event on the bus.
|
|
func (d *Deployer) publishInstanceStatus(instanceID, projectID, stageID, status string) {
|
|
if d.eventBus != nil {
|
|
d.eventBus.Publish(events.Event{
|
|
Type: events.EventInstanceStatus,
|
|
Payload: events.InstanceStatusPayload{
|
|
InstanceID: instanceID,
|
|
ProjectID: projectID,
|
|
StageID: stageID,
|
|
Status: status,
|
|
},
|
|
})
|
|
}
|
|
}
|
|
|
|
// ensureDNS creates or updates a DNS record for the given FQDN. Best-effort: logs warnings on failure.
|
|
func (d *Deployer) ensureDNS(ctx context.Context, fqdn, consumerType, consumerID, deployID string) {
|
|
dnsProvider := d.getDNS()
|
|
if dnsProvider == nil {
|
|
return
|
|
}
|
|
settings, err := d.store.GetSettings()
|
|
if err != nil {
|
|
slog.Warn("dns: get settings for server IP", "error", err)
|
|
return
|
|
}
|
|
if settings.ServerIP == "" {
|
|
slog.Warn("dns: server IP not configured, skipping DNS record creation", "fqdn", fqdn)
|
|
return
|
|
}
|
|
|
|
recordID, err := dnsProvider.EnsureRecord(ctx, fqdn, settings.ServerIP)
|
|
if err != nil {
|
|
msg := fmt.Sprintf("DNS: failed to create/update record for %s: %v", fqdn, err)
|
|
slog.Warn(msg)
|
|
if deployID != "" {
|
|
d.logDeploy(deployID, msg, "warn")
|
|
}
|
|
return
|
|
}
|
|
|
|
// Track the record locally.
|
|
if _, err := d.store.CreateDNSRecord(store.DNSRecord{
|
|
FQDN: fqdn,
|
|
ProviderRecordID: recordID,
|
|
ConsumerType: consumerType,
|
|
ConsumerID: consumerID,
|
|
}); err != nil {
|
|
// May already exist — try updating.
|
|
if updateErr := d.store.UpdateDNSRecordProviderID(fqdn, recordID); updateErr != nil {
|
|
slog.Warn("dns: failed to track record", "fqdn", fqdn, "error", updateErr)
|
|
}
|
|
}
|
|
|
|
logMsg := fmt.Sprintf("DNS: record ensured for %s", fqdn)
|
|
slog.Info(logMsg)
|
|
if deployID != "" {
|
|
d.logDeploy(deployID, logMsg, "info")
|
|
}
|
|
}
|
|
|
|
// removeDNS deletes a DNS record for the given FQDN. Best-effort: logs warnings on failure.
|
|
func (d *Deployer) removeDNS(ctx context.Context, fqdn, deployID string) {
|
|
dnsProvider := d.getDNS()
|
|
if dnsProvider == nil {
|
|
return
|
|
}
|
|
|
|
if err := dnsProvider.DeleteRecord(ctx, fqdn); err != nil {
|
|
msg := fmt.Sprintf("DNS: failed to delete record for %s: %v", fqdn, err)
|
|
slog.Warn(msg)
|
|
if deployID != "" {
|
|
d.logDeploy(deployID, msg, "warn")
|
|
}
|
|
return
|
|
}
|
|
|
|
// Remove local tracking.
|
|
if err := d.store.DeleteDNSRecord(fqdn); err != nil {
|
|
slog.Warn("dns: failed to remove tracking record", "fqdn", fqdn, "error", err)
|
|
}
|
|
|
|
logMsg := fmt.Sprintf("DNS: record deleted for %s", fqdn)
|
|
slog.Info(logMsg)
|
|
if deployID != "" {
|
|
d.logDeploy(deployID, logMsg, "info")
|
|
}
|
|
}
|
|
|
|
// truncateID safely truncates a Docker ID to 12 characters for display.
|
|
func truncateID(id string) string {
|
|
if len(id) > 12 {
|
|
return id[:12]
|
|
}
|
|
return id
|
|
}
|
|
|