From aefecdffdf0c1cc215fcc0a58bb845de7646b5f3 Mon Sep 17 00:00:00 2001 From: "alexei.dolgolyov" Date: Mon, 30 Mar 2026 11:12:25 +0300 Subject: [PATCH] feat(observability): phase 2 - stale container detection Add periodic scanner for stale containers: - Cron-based scanner (hourly) detects non-running containers exceeding threshold - last_alive_at tracking on instances, updated on deploy/start/restart - API: GET /api/containers/stale, POST cleanup (single + bulk) - Event log warnings emitted for newly stale containers - Graceful handling of externally removed containers --- cmd/server/main.go | 9 + internal/api/instances.go | 7 + internal/api/router.go | 16 ++ internal/api/stale.go | 172 ++++++++++++++++++ internal/deployer/deployer.go | 3 + internal/stale/scanner.go | 316 ++++++++++++++++++++++++++++++++++ internal/store/instances.go | 89 ++++++++-- internal/store/models.go | 1 + internal/store/store.go | 2 + 9 files changed, 596 insertions(+), 19 deletions(-) create mode 100644 internal/api/stale.go create mode 100644 internal/stale/scanner.go diff --git a/cmd/server/main.go b/cmd/server/main.go index c369f7d..cf148cb 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -25,6 +25,7 @@ import ( "github.com/alexei/docker-watcher/internal/notify" "github.com/alexei/docker-watcher/internal/npm" "github.com/alexei/docker-watcher/internal/registry" + "github.com/alexei/docker-watcher/internal/stale" "github.com/alexei/docker-watcher/internal/store" "github.com/alexei/docker-watcher/internal/webhook" ) @@ -130,8 +131,15 @@ func main() { } } + // Initialize stale container scanner. + staleScanner := stale.New(db, dockerClient, eventBus) + if err := staleScanner.Start("1h"); err != nil { + slog.Warn("failed to start stale scanner", "error", err) + } + // Build API server. apiServer := api.NewServer(db, dockerClient, npmClient, dep, webhookHandler, eventBus, encKey) + apiServer.SetStaleScanner(staleScanner) router := apiServer.Router() // Serve embedded static files for the SPA frontend. @@ -173,6 +181,7 @@ func main() { slog.Info("shutting down...") // Stop accepting new work. + staleScanner.Stop() poller.Stop() // Drain in-progress deploys and notifications. diff --git a/internal/api/instances.go b/internal/api/instances.go index 4b936bc..26eca47 100644 --- a/internal/api/instances.go +++ b/internal/api/instances.go @@ -196,6 +196,13 @@ func (s *Server) controlInstance(w http.ResponseWriter, r *http.Request, action slog.Error("update instance status", "instance_id", instanceID, "status", newStatus, "error", err) } + // Track last_alive_at when container becomes running. + if newStatus == "running" { + if err := s.store.UpdateLastAliveAt(instanceID); err != nil { + slog.Error("update last_alive_at", "instance_id", instanceID, "error", err) + } + } + respondJSON(w, http.StatusOK, map[string]string{ "instance_id": instanceID, "action": action, diff --git a/internal/api/router.go b/internal/api/router.go index 8f4e656..606045d 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -11,6 +11,7 @@ import ( "github.com/alexei/docker-watcher/internal/docker" "github.com/alexei/docker-watcher/internal/events" "github.com/alexei/docker-watcher/internal/npm" + "github.com/alexei/docker-watcher/internal/stale" "github.com/alexei/docker-watcher/internal/store" "github.com/alexei/docker-watcher/internal/webhook" ) @@ -26,6 +27,7 @@ type Server struct { encKey [32]byte localAuth *auth.LocalAuth oidcProvider *auth.OIDCProvider + staleScanner *stale.Scanner } // NewServer creates a new API Server with all required dependencies. @@ -60,6 +62,12 @@ func NewServer( return s } +// SetStaleScanner sets the stale scanner on the server. +// Called after both the API server and scanner are initialized. +func (s *Server) SetStaleScanner(scanner *stale.Scanner) { + s.staleScanner = scanner +} + // initOIDCProvider creates an OIDC provider from settings. Errors are logged, not fatal. func (s *Server) initOIDCProvider(ctx context.Context, as store.AuthSettings) { // Decrypt the OIDC client secret if it's encrypted. @@ -135,6 +143,9 @@ func (s *Server) Router() chi.Router { r.Get("/settings", s.getSettings) r.Get("/settings/npm-certificates", s.listNpmCertificates) + // Stale container endpoints. + r.Get("/containers/stale", s.listStaleContainers) + // Admin-only routes: require admin role. r.Group(func(r chi.Router) { r.Use(auth.AdminOnly) @@ -192,6 +203,11 @@ func (s *Server) Router() chi.Router { r.Post("/test", s.testRegistry) }) + // Stale container cleanup endpoints (admin-only). + // Bulk route must be registered before parameterized route. + r.Post("/containers/stale/cleanup", s.bulkCleanupStaleContainers) + r.Post("/containers/stale/{id}/cleanup", s.cleanupStaleContainer) + // Settings endpoints. r.Put("/settings", s.updateSettings) r.Get("/settings/webhook-url", s.getWebhookURL) diff --git a/internal/api/stale.go b/internal/api/stale.go new file mode 100644 index 0000000..26d9c4e --- /dev/null +++ b/internal/api/stale.go @@ -0,0 +1,172 @@ +package api + +import ( + "errors" + "log/slog" + "net/http" + + "github.com/go-chi/chi/v5" + + "github.com/alexei/docker-watcher/internal/crypto" + "github.com/alexei/docker-watcher/internal/events" + "github.com/alexei/docker-watcher/internal/stale" + "github.com/alexei/docker-watcher/internal/store" +) + +// listStaleContainers handles GET /api/containers/stale. +func (s *Server) listStaleContainers(w http.ResponseWriter, r *http.Request) { + if s.staleScanner == nil { + respondError(w, http.StatusServiceUnavailable, "stale scanner not initialized") + return + } + + staleInstances, err := s.staleScanner.FindStaleInstances(r.Context()) + if err != nil { + respondError(w, http.StatusInternalServerError, "failed to find stale containers: "+err.Error()) + return + } + + if staleInstances == nil { + staleInstances = []stale.StaleInstance{} + } + respondJSON(w, http.StatusOK, staleInstances) +} + +// cleanupStaleContainer handles POST /api/containers/stale/{id}/cleanup. +// Stops the Docker container, removes the NPM proxy, and deletes the instance from the store. +func (s *Server) cleanupStaleContainer(w http.ResponseWriter, r *http.Request) { + instanceID := chi.URLParam(r, "id") + + inst, err := s.store.GetInstanceByID(instanceID) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + respondNotFound(w, "instance") + return + } + respondError(w, http.StatusInternalServerError, "failed to get instance: "+err.Error()) + return + } + + // Don't remove instances already being cleaned up. + if inst.Status == "removing" { + respondError(w, http.StatusConflict, "instance is already being removed") + return + } + + if err := s.cleanupInstance(r, inst); err != nil { + respondError(w, http.StatusInternalServerError, "failed to cleanup instance: "+err.Error()) + return + } + + respondJSON(w, http.StatusOK, map[string]string{"cleaned": instanceID}) +} + +// bulkCleanupStaleContainers handles POST /api/containers/stale/cleanup. +// Cleans up all currently stale containers. +func (s *Server) bulkCleanupStaleContainers(w http.ResponseWriter, r *http.Request) { + if s.staleScanner == nil { + respondError(w, http.StatusServiceUnavailable, "stale scanner not initialized") + return + } + + staleInstances, err := s.staleScanner.FindStaleInstances(r.Context()) + if err != nil { + respondError(w, http.StatusInternalServerError, "failed to find stale containers: "+err.Error()) + return + } + + var cleaned []string + var failed []string + + for _, si := range staleInstances { + if si.Instance.Status == "removing" { + continue + } + if err := s.cleanupInstance(r, si.Instance); err != nil { + slog.Error("bulk stale cleanup failed", + "instance_id", si.Instance.ID, "error", err) + failed = append(failed, si.Instance.ID) + continue + } + cleaned = append(cleaned, si.Instance.ID) + } + + respondJSON(w, http.StatusOK, map[string]any{ + "cleaned": cleaned, + "failed": failed, + }) +} + +// cleanupInstance stops a Docker container, removes the NPM proxy, deletes +// the store record, and emits an event. +func (s *Server) cleanupInstance(r *http.Request, inst store.Instance) error { + ctx := r.Context() + + // Mark as removing. + if err := s.store.UpdateInstanceStatus(inst.ID, "removing"); err != nil { + slog.Warn("stale cleanup: update status to removing", "instance_id", inst.ID, "error", err) + } + + // Stop and remove Docker container. + if inst.ContainerID != "" { + if err := s.docker.StopContainer(ctx, inst.ContainerID, 10); err != nil { + slog.Warn("stale cleanup: stop container", "container_id", inst.ContainerID, "error", err) + } + if err := s.docker.RemoveContainer(ctx, inst.ContainerID, true); err != nil { + slog.Warn("stale cleanup: remove container", "container_id", inst.ContainerID, "error", err) + } + } + + // Delete NPM proxy host if present. + if inst.NpmProxyID > 0 { + settings, err := s.store.GetSettings() + if err == nil { + npmPassword, err := crypto.Decrypt(s.encKey, settings.NpmPassword) + if err == nil { + if authErr := s.npm.Authenticate(ctx, settings.NpmEmail, npmPassword); authErr == nil { + if delErr := s.npm.DeleteProxyHost(ctx, inst.NpmProxyID); delErr != nil { + slog.Warn("stale cleanup: delete proxy host", "proxy_id", inst.NpmProxyID, "error", delErr) + } + } + } + } + } + + // Delete instance record. + if err := s.store.DeleteInstance(inst.ID); err != nil { + return err + } + + // Emit cleanup event. + s.emitStaleCleanupEvent(inst) + + return nil +} + +// emitStaleCleanupEvent publishes an event when a stale container is cleaned up. +func (s *Server) emitStaleCleanupEvent(inst store.Instance) { + msg := "Stale container cleaned up: " + inst.ID + " (tag: " + inst.ImageTag + ")" + + evt, err := s.store.InsertEvent(store.EventLog{ + Source: "stale_cleanup", + Severity: "info", + Message: msg, + Metadata: `{"instance_id":"` + inst.ID + `","project_id":"` + inst.ProjectID + `","stage_id":"` + inst.StageID + `"}`, + }) + if err != nil { + slog.Error("stale cleanup: failed to persist event", "error", err) + return + } + + s.eventBus.Publish(events.Event{ + Type: events.EventLog, + Payload: events.EventLogPayload{ + ID: evt.ID, + Source: "stale_cleanup", + Severity: "info", + Message: msg, + Metadata: evt.Metadata, + CreatedAt: evt.CreatedAt, + }, + }) +} diff --git a/internal/deployer/deployer.go b/internal/deployer/deployer.go index f8442f9..4b0cda3 100644 --- a/internal/deployer/deployer.go +++ b/internal/deployer/deployer.go @@ -333,6 +333,9 @@ func (d *Deployer) executeDeploy( if err := d.store.UpdateInstanceStatus(instanceID, "running"); err != nil { slog.Warn("update instance status to running", "error", err) } + if err := d.store.UpdateLastAliveAt(instanceID); err != nil { + slog.Warn("update last_alive_at on deploy", "instance_id", instanceID, "error", err) + } d.publishInstanceStatus(instanceID, project.ID, stage.ID, "running") d.logDeploy(deployID, "Container started", "info") diff --git a/internal/stale/scanner.go b/internal/stale/scanner.go new file mode 100644 index 0000000..94063fa --- /dev/null +++ b/internal/stale/scanner.go @@ -0,0 +1,316 @@ +package stale + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/alexei/docker-watcher/internal/docker" + "github.com/alexei/docker-watcher/internal/events" + "github.com/alexei/docker-watcher/internal/store" + "github.com/robfig/cron/v3" +) + +// StaleInstance holds enriched info about a stale container for API responses. +type StaleInstance struct { + Instance store.Instance `json:"instance"` + ProjectName string `json:"project_name"` + StageName string `json:"stage_name"` + DaysStale int `json:"days_stale"` +} + +// Scanner periodically checks for stale containers that have been +// non-running for longer than the configured threshold. +type Scanner struct { + store *store.Store + docker *docker.Client + eventBus *events.Bus + + cron *cron.Cron + mu sync.Mutex + entryID cron.EntryID + running bool + + // knownStale tracks instance IDs that have already had a stale event emitted, + // to avoid re-emitting warnings for the same instance. + knownStale map[string]struct{} +} + +// New creates a new stale container scanner. +func New(st *store.Store, dockerClient *docker.Client, eventBus *events.Bus) *Scanner { + return &Scanner{ + store: st, + docker: dockerClient, + eventBus: eventBus, + cron: cron.New(), + knownStale: make(map[string]struct{}), + } +} + +// Start begins the periodic stale container scan with the given interval (e.g., "1h", "30m"). +// If the scanner is already running, it stops and restarts with the new interval. +func (s *Scanner) Start(interval string) error { + s.mu.Lock() + defer s.mu.Unlock() + + duration, err := time.ParseDuration(interval) + if err != nil { + return fmt.Errorf("parse stale scan interval %q: %w", interval, err) + } + + if s.running { + s.cron.Remove(s.entryID) + } + + spec := fmt.Sprintf("@every %s", duration.String()) + entryID, err := s.cron.AddFunc(spec, func() { + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) + defer cancel() + if scanErr := s.Scan(ctx); scanErr != nil { + slog.Warn("stale scanner: scan error", "error", scanErr) + } + }) + if err != nil { + return fmt.Errorf("schedule stale scanner: %w", err) + } + + s.entryID = entryID + if !s.running { + s.cron.Start() + } + s.running = true + + slog.Info("stale scanner started", "interval", duration.String()) + return nil +} + +// Stop gracefully shuts down the scanner. +func (s *Scanner) Stop() { + s.mu.Lock() + defer s.mu.Unlock() + + if s.running { + ctx := s.cron.Stop() + <-ctx.Done() + s.running = false + slog.Info("stale scanner stopped") + } +} + +// Scan performs a single stale-container scan cycle. +// It updates last_alive_at for running containers and detects newly stale ones. +func (s *Scanner) Scan(ctx context.Context) error { + settings, err := s.store.GetSettings() + if err != nil { + return fmt.Errorf("get settings: %w", err) + } + + thresholdDays := settings.StaleThresholdDays + if thresholdDays <= 0 { + thresholdDays = 7 + } + + // Get all instances from the store. + instances, err := s.store.ListAllInstances() + if err != nil { + return fmt.Errorf("list all instances: %w", err) + } + + if len(instances) == 0 { + return nil + } + + // Get all managed Docker containers to check live state. + containers, err := s.docker.ListContainers(ctx, nil) + if err != nil { + return fmt.Errorf("list docker containers: %w", err) + } + + // Build a lookup: instance ID -> container state. + containerStateByInstanceID := make(map[string]string, len(containers)) + for _, c := range containers { + if c.InstanceID != "" { + containerStateByInstanceID[c.InstanceID] = c.State + } + } + + now := time.Now().UTC() + currentStaleIDs := make(map[string]struct{}) + + for _, inst := range instances { + // Skip instances already being cleaned up. + if inst.Status == "removing" { + continue + } + + dockerState := containerStateByInstanceID[inst.ID] + + // If the container is running in Docker, update last_alive_at. + if dockerState == "running" { + if err := s.store.UpdateLastAliveAt(inst.ID); err != nil { + slog.Warn("stale scanner: failed to update last_alive_at", + "instance_id", inst.ID, "error", err) + } + // Also sync store status if it was out of date. + if inst.Status != "running" { + if err := s.store.UpdateInstanceStatus(inst.ID, "running"); err != nil { + slog.Warn("stale scanner: failed to sync instance status", + "instance_id", inst.ID, "error", err) + } + } + continue + } + + // Container is not running. Check if it's stale. + if inst.LastAliveAt == "" { + // Never been seen running. Use created_at as fallback. + inst.LastAliveAt = inst.CreatedAt + } + + lastAlive, parseErr := time.Parse("2006-01-02 15:04:05", inst.LastAliveAt) + if parseErr != nil { + slog.Warn("stale scanner: failed to parse last_alive_at", + "instance_id", inst.ID, "last_alive_at", inst.LastAliveAt, "error", parseErr) + continue + } + + daysSinceAlive := int(now.Sub(lastAlive).Hours() / 24) + if daysSinceAlive < thresholdDays { + continue + } + + // This instance is stale. + currentStaleIDs[inst.ID] = struct{}{} + + // Emit event only if this is newly detected as stale. + if _, alreadyKnown := s.knownStale[inst.ID]; !alreadyKnown { + s.emitStaleEvent(inst, daysSinceAlive) + } + } + + // Update known stale set: remove IDs that are no longer stale. + s.knownStale = currentStaleIDs + + return nil +} + +// FindStaleInstances returns all currently stale instances with enriched project/stage info. +func (s *Scanner) FindStaleInstances(ctx context.Context) ([]StaleInstance, error) { + settings, err := s.store.GetSettings() + if err != nil { + return nil, fmt.Errorf("get settings: %w", err) + } + + thresholdDays := settings.StaleThresholdDays + if thresholdDays <= 0 { + thresholdDays = 7 + } + + instances, err := s.store.ListAllInstances() + if err != nil { + return nil, fmt.Errorf("list all instances: %w", err) + } + + containers, err := s.docker.ListContainers(ctx, nil) + if err != nil { + return nil, fmt.Errorf("list docker containers: %w", err) + } + + containerStateByInstanceID := make(map[string]string, len(containers)) + for _, c := range containers { + if c.InstanceID != "" { + containerStateByInstanceID[c.InstanceID] = c.State + } + } + + now := time.Now().UTC() + var result []StaleInstance + + for _, inst := range instances { + if inst.Status == "removing" { + continue + } + + // If Docker says it's running, it's not stale. + if containerStateByInstanceID[inst.ID] == "running" { + continue + } + + lastAlive := inst.LastAliveAt + if lastAlive == "" { + lastAlive = inst.CreatedAt + } + + lastAliveTime, parseErr := time.Parse("2006-01-02 15:04:05", lastAlive) + if parseErr != nil { + continue + } + + daysSinceAlive := int(now.Sub(lastAliveTime).Hours() / 24) + if daysSinceAlive < thresholdDays { + continue + } + + // Look up project and stage names. + projectName := inst.ProjectID + stageName := inst.StageID + if proj, err := s.store.GetProjectByID(inst.ProjectID); err == nil { + projectName = proj.Name + } + if stg, err := s.store.GetStageByID(inst.StageID); err == nil { + stageName = stg.Name + } + + result = append(result, StaleInstance{ + Instance: inst, + ProjectName: projectName, + StageName: stageName, + DaysStale: daysSinceAlive, + }) + } + + return result, nil +} + +// emitStaleEvent publishes a warning event for a newly detected stale container. +func (s *Scanner) emitStaleEvent(inst store.Instance, daysStale int) { + metadata, _ := json.Marshal(map[string]any{ + "instance_id": inst.ID, + "project_id": inst.ProjectID, + "stage_id": inst.StageID, + "image_tag": inst.ImageTag, + "last_alive_at": inst.LastAliveAt, + "days_stale": daysStale, + }) + + msg := fmt.Sprintf("Container %s (tag: %s) has been non-running for %d days", + inst.ID, inst.ImageTag, daysStale) + + // Persist directly to event log. + evt, err := s.store.InsertEvent(store.EventLog{ + Source: "stale_scanner", + Severity: "warn", + Message: msg, + Metadata: string(metadata), + }) + if err != nil { + slog.Error("stale scanner: failed to persist event", "error", err) + return + } + + // Publish for SSE clients. + s.eventBus.Publish(events.Event{ + Type: events.EventLog, + Payload: events.EventLogPayload{ + ID: evt.ID, + Source: "stale_scanner", + Severity: "warn", + Message: msg, + Metadata: string(metadata), + CreatedAt: evt.CreatedAt, + }, + }) +} diff --git a/internal/store/instances.go b/internal/store/instances.go index 2cb4cfb..d0fb730 100644 --- a/internal/store/instances.go +++ b/internal/store/instances.go @@ -8,6 +8,20 @@ import ( "github.com/google/uuid" ) +// instanceColumns is the canonical column list for instance queries. +const instanceColumns = `id, stage_id, project_id, container_id, image_tag, subdomain, npm_proxy_id, status, port, last_alive_at, created_at, updated_at` + +// scanInstance scans a row into an Instance struct using the canonical column order. +func scanInstance(scanner interface{ Scan(...any) error }) (Instance, error) { + var inst Instance + err := scanner.Scan( + &inst.ID, &inst.StageID, &inst.ProjectID, &inst.ContainerID, &inst.ImageTag, + &inst.Subdomain, &inst.NpmProxyID, &inst.Status, &inst.Port, + &inst.LastAliveAt, &inst.CreatedAt, &inst.UpdatedAt, + ) + return inst, err +} + // CreateInstance inserts a new instance record. func (s *Store) CreateInstance(inst Instance) (Instance, error) { inst.ID = uuid.New().String() @@ -15,10 +29,11 @@ func (s *Store) CreateInstance(inst Instance) (Instance, error) { inst.UpdatedAt = inst.CreatedAt _, err := s.db.Exec( - `INSERT INTO instances (id, stage_id, project_id, container_id, image_tag, subdomain, npm_proxy_id, status, port, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + `INSERT INTO instances (`+instanceColumns+`) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, inst.ID, inst.StageID, inst.ProjectID, inst.ContainerID, inst.ImageTag, - inst.Subdomain, inst.NpmProxyID, inst.Status, inst.Port, inst.CreatedAt, inst.UpdatedAt, + inst.Subdomain, inst.NpmProxyID, inst.Status, inst.Port, + inst.LastAliveAt, inst.CreatedAt, inst.UpdatedAt, ) if err != nil { return Instance{}, fmt.Errorf("insert instance: %w", err) @@ -36,10 +51,11 @@ func (s *Store) CreateInstanceWithID(inst Instance) (Instance, error) { inst.UpdatedAt = inst.CreatedAt _, err := s.db.Exec( - `INSERT INTO instances (id, stage_id, project_id, container_id, image_tag, subdomain, npm_proxy_id, status, port, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, + `INSERT INTO instances (`+instanceColumns+`) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, inst.ID, inst.StageID, inst.ProjectID, inst.ContainerID, inst.ImageTag, - inst.Subdomain, inst.NpmProxyID, inst.Status, inst.Port, inst.CreatedAt, inst.UpdatedAt, + inst.Subdomain, inst.NpmProxyID, inst.Status, inst.Port, + inst.LastAliveAt, inst.CreatedAt, inst.UpdatedAt, ) if err != nil { return Instance{}, fmt.Errorf("insert instance: %w", err) @@ -49,12 +65,9 @@ func (s *Store) CreateInstanceWithID(inst Instance) (Instance, error) { // GetInstanceByID returns a single instance by its ID. func (s *Store) GetInstanceByID(id string) (Instance, error) { - var inst Instance - err := s.db.QueryRow( - `SELECT id, stage_id, project_id, container_id, image_tag, subdomain, npm_proxy_id, status, port, created_at, updated_at - FROM instances WHERE id = ?`, id, - ).Scan(&inst.ID, &inst.StageID, &inst.ProjectID, &inst.ContainerID, &inst.ImageTag, - &inst.Subdomain, &inst.NpmProxyID, &inst.Status, &inst.Port, &inst.CreatedAt, &inst.UpdatedAt) + inst, err := scanInstance(s.db.QueryRow( + `SELECT `+instanceColumns+` FROM instances WHERE id = ?`, id, + )) if errors.Is(err, sql.ErrNoRows) { return Instance{}, fmt.Errorf("instance %s: %w", id, ErrNotFound) } @@ -67,8 +80,7 @@ func (s *Store) GetInstanceByID(id string) (Instance, error) { // GetInstancesByStageID returns all instances for a given stage. func (s *Store) GetInstancesByStageID(stageID string) ([]Instance, error) { rows, err := s.db.Query( - `SELECT id, stage_id, project_id, container_id, image_tag, subdomain, npm_proxy_id, status, port, created_at, updated_at - FROM instances WHERE stage_id = ? ORDER BY created_at DESC`, stageID, + `SELECT `+instanceColumns+` FROM instances WHERE stage_id = ? ORDER BY created_at DESC`, stageID, ) if err != nil { return nil, fmt.Errorf("query instances: %w", err) @@ -77,9 +89,29 @@ func (s *Store) GetInstancesByStageID(stageID string) ([]Instance, error) { instances := []Instance{} for rows.Next() { - var inst Instance - if err := rows.Scan(&inst.ID, &inst.StageID, &inst.ProjectID, &inst.ContainerID, &inst.ImageTag, - &inst.Subdomain, &inst.NpmProxyID, &inst.Status, &inst.Port, &inst.CreatedAt, &inst.UpdatedAt); err != nil { + inst, err := scanInstance(rows) + if err != nil { + return nil, fmt.Errorf("scan instance: %w", err) + } + instances = append(instances, inst) + } + return instances, rows.Err() +} + +// ListAllInstances returns all instances across all stages. +func (s *Store) ListAllInstances() ([]Instance, error) { + rows, err := s.db.Query( + `SELECT ` + instanceColumns + ` FROM instances ORDER BY created_at DESC`, + ) + if err != nil { + return nil, fmt.Errorf("query all instances: %w", err) + } + defer rows.Close() + + instances := []Instance{} + for rows.Next() { + inst, err := scanInstance(rows) + if err != nil { return nil, fmt.Errorf("scan instance: %w", err) } instances = append(instances, inst) @@ -91,10 +123,11 @@ func (s *Store) GetInstancesByStageID(stageID string) ([]Instance, error) { func (s *Store) UpdateInstance(inst Instance) error { inst.UpdatedAt = Now() result, err := s.db.Exec( - `UPDATE instances SET stage_id=?, project_id=?, container_id=?, image_tag=?, subdomain=?, npm_proxy_id=?, status=?, port=?, updated_at=? + `UPDATE instances SET stage_id=?, project_id=?, container_id=?, image_tag=?, subdomain=?, npm_proxy_id=?, status=?, port=?, last_alive_at=?, updated_at=? WHERE id=?`, inst.StageID, inst.ProjectID, inst.ContainerID, inst.ImageTag, - inst.Subdomain, inst.NpmProxyID, inst.Status, inst.Port, inst.UpdatedAt, inst.ID, + inst.Subdomain, inst.NpmProxyID, inst.Status, inst.Port, + inst.LastAliveAt, inst.UpdatedAt, inst.ID, ) if err != nil { return fmt.Errorf("update instance: %w", err) @@ -123,6 +156,24 @@ func (s *Store) UpdateInstanceStatus(id string, status string) error { return nil } +// UpdateLastAliveAt sets the last_alive_at timestamp for an instance. +// Called when an instance is seen running. +func (s *Store) UpdateLastAliveAt(id string) error { + ts := Now() + result, err := s.db.Exec( + `UPDATE instances SET last_alive_at=?, updated_at=? WHERE id=?`, + ts, ts, id, + ) + if err != nil { + return fmt.Errorf("update last_alive_at: %w", err) + } + n, _ := result.RowsAffected() + if n == 0 { + return fmt.Errorf("instance %s: %w", id, ErrNotFound) + } + return nil +} + // DeleteInstance removes an instance by ID. func (s *Store) DeleteInstance(id string) error { result, err := s.db.Exec(`DELETE FROM instances WHERE id = ?`, id) diff --git a/internal/store/models.go b/internal/store/models.go index 5b0ed1b..cd4eedd 100644 --- a/internal/store/models.go +++ b/internal/store/models.go @@ -71,6 +71,7 @@ type Instance struct { NpmProxyID int `json:"npm_proxy_id"` Status string `json:"status"` // running, stopped, failed, removing Port int `json:"port"` + LastAliveAt string `json:"last_alive_at"` CreatedAt string `json:"created_at"` UpdatedAt string `json:"updated_at"` } diff --git a/internal/store/store.go b/internal/store/store.go index e4cf857..0ea811f 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -83,6 +83,8 @@ func (s *Store) runMigrations() error { `ALTER TABLE settings ADD COLUMN ssl_certificate_id INTEGER NOT NULL DEFAULT 0`, // Add stale_threshold_days to settings (2026-03-30). `ALTER TABLE settings ADD COLUMN stale_threshold_days INTEGER NOT NULL DEFAULT 7`, + // Add last_alive_at to instances for stale container detection (2026-03-30). + `ALTER TABLE instances ADD COLUMN last_alive_at TEXT NOT NULL DEFAULT ''`, } for _, m := range migrations {