feat(observability): phase 2 - stale container detection
Add periodic scanner for stale containers: - Cron-based scanner (hourly) detects non-running containers exceeding threshold - last_alive_at tracking on instances, updated on deploy/start/restart - API: GET /api/containers/stale, POST cleanup (single + bulk) - Event log warnings emitted for newly stale containers - Graceful handling of externally removed containers
This commit is contained in:
@@ -25,6 +25,7 @@ import (
|
|||||||
"github.com/alexei/docker-watcher/internal/notify"
|
"github.com/alexei/docker-watcher/internal/notify"
|
||||||
"github.com/alexei/docker-watcher/internal/npm"
|
"github.com/alexei/docker-watcher/internal/npm"
|
||||||
"github.com/alexei/docker-watcher/internal/registry"
|
"github.com/alexei/docker-watcher/internal/registry"
|
||||||
|
"github.com/alexei/docker-watcher/internal/stale"
|
||||||
"github.com/alexei/docker-watcher/internal/store"
|
"github.com/alexei/docker-watcher/internal/store"
|
||||||
"github.com/alexei/docker-watcher/internal/webhook"
|
"github.com/alexei/docker-watcher/internal/webhook"
|
||||||
)
|
)
|
||||||
@@ -130,8 +131,15 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initialize stale container scanner.
|
||||||
|
staleScanner := stale.New(db, dockerClient, eventBus)
|
||||||
|
if err := staleScanner.Start("1h"); err != nil {
|
||||||
|
slog.Warn("failed to start stale scanner", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
// Build API server.
|
// Build API server.
|
||||||
apiServer := api.NewServer(db, dockerClient, npmClient, dep, webhookHandler, eventBus, encKey)
|
apiServer := api.NewServer(db, dockerClient, npmClient, dep, webhookHandler, eventBus, encKey)
|
||||||
|
apiServer.SetStaleScanner(staleScanner)
|
||||||
router := apiServer.Router()
|
router := apiServer.Router()
|
||||||
|
|
||||||
// Serve embedded static files for the SPA frontend.
|
// Serve embedded static files for the SPA frontend.
|
||||||
@@ -173,6 +181,7 @@ func main() {
|
|||||||
slog.Info("shutting down...")
|
slog.Info("shutting down...")
|
||||||
|
|
||||||
// Stop accepting new work.
|
// Stop accepting new work.
|
||||||
|
staleScanner.Stop()
|
||||||
poller.Stop()
|
poller.Stop()
|
||||||
|
|
||||||
// Drain in-progress deploys and notifications.
|
// Drain in-progress deploys and notifications.
|
||||||
|
|||||||
@@ -196,6 +196,13 @@ func (s *Server) controlInstance(w http.ResponseWriter, r *http.Request, action
|
|||||||
slog.Error("update instance status", "instance_id", instanceID, "status", newStatus, "error", err)
|
slog.Error("update instance status", "instance_id", instanceID, "status", newStatus, "error", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Track last_alive_at when container becomes running.
|
||||||
|
if newStatus == "running" {
|
||||||
|
if err := s.store.UpdateLastAliveAt(instanceID); err != nil {
|
||||||
|
slog.Error("update last_alive_at", "instance_id", instanceID, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
respondJSON(w, http.StatusOK, map[string]string{
|
respondJSON(w, http.StatusOK, map[string]string{
|
||||||
"instance_id": instanceID,
|
"instance_id": instanceID,
|
||||||
"action": action,
|
"action": action,
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/alexei/docker-watcher/internal/docker"
|
"github.com/alexei/docker-watcher/internal/docker"
|
||||||
"github.com/alexei/docker-watcher/internal/events"
|
"github.com/alexei/docker-watcher/internal/events"
|
||||||
"github.com/alexei/docker-watcher/internal/npm"
|
"github.com/alexei/docker-watcher/internal/npm"
|
||||||
|
"github.com/alexei/docker-watcher/internal/stale"
|
||||||
"github.com/alexei/docker-watcher/internal/store"
|
"github.com/alexei/docker-watcher/internal/store"
|
||||||
"github.com/alexei/docker-watcher/internal/webhook"
|
"github.com/alexei/docker-watcher/internal/webhook"
|
||||||
)
|
)
|
||||||
@@ -26,6 +27,7 @@ type Server struct {
|
|||||||
encKey [32]byte
|
encKey [32]byte
|
||||||
localAuth *auth.LocalAuth
|
localAuth *auth.LocalAuth
|
||||||
oidcProvider *auth.OIDCProvider
|
oidcProvider *auth.OIDCProvider
|
||||||
|
staleScanner *stale.Scanner
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewServer creates a new API Server with all required dependencies.
|
// NewServer creates a new API Server with all required dependencies.
|
||||||
@@ -60,6 +62,12 @@ func NewServer(
|
|||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetStaleScanner sets the stale scanner on the server.
|
||||||
|
// Called after both the API server and scanner are initialized.
|
||||||
|
func (s *Server) SetStaleScanner(scanner *stale.Scanner) {
|
||||||
|
s.staleScanner = scanner
|
||||||
|
}
|
||||||
|
|
||||||
// initOIDCProvider creates an OIDC provider from settings. Errors are logged, not fatal.
|
// initOIDCProvider creates an OIDC provider from settings. Errors are logged, not fatal.
|
||||||
func (s *Server) initOIDCProvider(ctx context.Context, as store.AuthSettings) {
|
func (s *Server) initOIDCProvider(ctx context.Context, as store.AuthSettings) {
|
||||||
// Decrypt the OIDC client secret if it's encrypted.
|
// Decrypt the OIDC client secret if it's encrypted.
|
||||||
@@ -135,6 +143,9 @@ func (s *Server) Router() chi.Router {
|
|||||||
r.Get("/settings", s.getSettings)
|
r.Get("/settings", s.getSettings)
|
||||||
r.Get("/settings/npm-certificates", s.listNpmCertificates)
|
r.Get("/settings/npm-certificates", s.listNpmCertificates)
|
||||||
|
|
||||||
|
// Stale container endpoints.
|
||||||
|
r.Get("/containers/stale", s.listStaleContainers)
|
||||||
|
|
||||||
// Admin-only routes: require admin role.
|
// Admin-only routes: require admin role.
|
||||||
r.Group(func(r chi.Router) {
|
r.Group(func(r chi.Router) {
|
||||||
r.Use(auth.AdminOnly)
|
r.Use(auth.AdminOnly)
|
||||||
@@ -192,6 +203,11 @@ func (s *Server) Router() chi.Router {
|
|||||||
r.Post("/test", s.testRegistry)
|
r.Post("/test", s.testRegistry)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// Stale container cleanup endpoints (admin-only).
|
||||||
|
// Bulk route must be registered before parameterized route.
|
||||||
|
r.Post("/containers/stale/cleanup", s.bulkCleanupStaleContainers)
|
||||||
|
r.Post("/containers/stale/{id}/cleanup", s.cleanupStaleContainer)
|
||||||
|
|
||||||
// Settings endpoints.
|
// Settings endpoints.
|
||||||
r.Put("/settings", s.updateSettings)
|
r.Put("/settings", s.updateSettings)
|
||||||
r.Get("/settings/webhook-url", s.getWebhookURL)
|
r.Get("/settings/webhook-url", s.getWebhookURL)
|
||||||
|
|||||||
@@ -0,0 +1,172 @@
|
|||||||
|
package api
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"log/slog"
|
||||||
|
"net/http"
|
||||||
|
|
||||||
|
"github.com/go-chi/chi/v5"
|
||||||
|
|
||||||
|
"github.com/alexei/docker-watcher/internal/crypto"
|
||||||
|
"github.com/alexei/docker-watcher/internal/events"
|
||||||
|
"github.com/alexei/docker-watcher/internal/stale"
|
||||||
|
"github.com/alexei/docker-watcher/internal/store"
|
||||||
|
)
|
||||||
|
|
||||||
|
// listStaleContainers handles GET /api/containers/stale.
|
||||||
|
func (s *Server) listStaleContainers(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if s.staleScanner == nil {
|
||||||
|
respondError(w, http.StatusServiceUnavailable, "stale scanner not initialized")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
staleInstances, err := s.staleScanner.FindStaleInstances(r.Context())
|
||||||
|
if err != nil {
|
||||||
|
respondError(w, http.StatusInternalServerError, "failed to find stale containers: "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if staleInstances == nil {
|
||||||
|
staleInstances = []stale.StaleInstance{}
|
||||||
|
}
|
||||||
|
respondJSON(w, http.StatusOK, staleInstances)
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanupStaleContainer handles POST /api/containers/stale/{id}/cleanup.
|
||||||
|
// Stops the Docker container, removes the NPM proxy, and deletes the instance from the store.
|
||||||
|
func (s *Server) cleanupStaleContainer(w http.ResponseWriter, r *http.Request) {
|
||||||
|
instanceID := chi.URLParam(r, "id")
|
||||||
|
|
||||||
|
inst, err := s.store.GetInstanceByID(instanceID)
|
||||||
|
if err != nil {
|
||||||
|
if errors.Is(err, store.ErrNotFound) {
|
||||||
|
respondNotFound(w, "instance")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
respondError(w, http.StatusInternalServerError, "failed to get instance: "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Don't remove instances already being cleaned up.
|
||||||
|
if inst.Status == "removing" {
|
||||||
|
respondError(w, http.StatusConflict, "instance is already being removed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := s.cleanupInstance(r, inst); err != nil {
|
||||||
|
respondError(w, http.StatusInternalServerError, "failed to cleanup instance: "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
respondJSON(w, http.StatusOK, map[string]string{"cleaned": instanceID})
|
||||||
|
}
|
||||||
|
|
||||||
|
// bulkCleanupStaleContainers handles POST /api/containers/stale/cleanup.
|
||||||
|
// Cleans up all currently stale containers.
|
||||||
|
func (s *Server) bulkCleanupStaleContainers(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if s.staleScanner == nil {
|
||||||
|
respondError(w, http.StatusServiceUnavailable, "stale scanner not initialized")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
staleInstances, err := s.staleScanner.FindStaleInstances(r.Context())
|
||||||
|
if err != nil {
|
||||||
|
respondError(w, http.StatusInternalServerError, "failed to find stale containers: "+err.Error())
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var cleaned []string
|
||||||
|
var failed []string
|
||||||
|
|
||||||
|
for _, si := range staleInstances {
|
||||||
|
if si.Instance.Status == "removing" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := s.cleanupInstance(r, si.Instance); err != nil {
|
||||||
|
slog.Error("bulk stale cleanup failed",
|
||||||
|
"instance_id", si.Instance.ID, "error", err)
|
||||||
|
failed = append(failed, si.Instance.ID)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cleaned = append(cleaned, si.Instance.ID)
|
||||||
|
}
|
||||||
|
|
||||||
|
respondJSON(w, http.StatusOK, map[string]any{
|
||||||
|
"cleaned": cleaned,
|
||||||
|
"failed": failed,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// cleanupInstance stops a Docker container, removes the NPM proxy, deletes
|
||||||
|
// the store record, and emits an event.
|
||||||
|
func (s *Server) cleanupInstance(r *http.Request, inst store.Instance) error {
|
||||||
|
ctx := r.Context()
|
||||||
|
|
||||||
|
// Mark as removing.
|
||||||
|
if err := s.store.UpdateInstanceStatus(inst.ID, "removing"); err != nil {
|
||||||
|
slog.Warn("stale cleanup: update status to removing", "instance_id", inst.ID, "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop and remove Docker container.
|
||||||
|
if inst.ContainerID != "" {
|
||||||
|
if err := s.docker.StopContainer(ctx, inst.ContainerID, 10); err != nil {
|
||||||
|
slog.Warn("stale cleanup: stop container", "container_id", inst.ContainerID, "error", err)
|
||||||
|
}
|
||||||
|
if err := s.docker.RemoveContainer(ctx, inst.ContainerID, true); err != nil {
|
||||||
|
slog.Warn("stale cleanup: remove container", "container_id", inst.ContainerID, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete NPM proxy host if present.
|
||||||
|
if inst.NpmProxyID > 0 {
|
||||||
|
settings, err := s.store.GetSettings()
|
||||||
|
if err == nil {
|
||||||
|
npmPassword, err := crypto.Decrypt(s.encKey, settings.NpmPassword)
|
||||||
|
if err == nil {
|
||||||
|
if authErr := s.npm.Authenticate(ctx, settings.NpmEmail, npmPassword); authErr == nil {
|
||||||
|
if delErr := s.npm.DeleteProxyHost(ctx, inst.NpmProxyID); delErr != nil {
|
||||||
|
slog.Warn("stale cleanup: delete proxy host", "proxy_id", inst.NpmProxyID, "error", delErr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete instance record.
|
||||||
|
if err := s.store.DeleteInstance(inst.ID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Emit cleanup event.
|
||||||
|
s.emitStaleCleanupEvent(inst)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// emitStaleCleanupEvent publishes an event when a stale container is cleaned up.
|
||||||
|
func (s *Server) emitStaleCleanupEvent(inst store.Instance) {
|
||||||
|
msg := "Stale container cleaned up: " + inst.ID + " (tag: " + inst.ImageTag + ")"
|
||||||
|
|
||||||
|
evt, err := s.store.InsertEvent(store.EventLog{
|
||||||
|
Source: "stale_cleanup",
|
||||||
|
Severity: "info",
|
||||||
|
Message: msg,
|
||||||
|
Metadata: `{"instance_id":"` + inst.ID + `","project_id":"` + inst.ProjectID + `","stage_id":"` + inst.StageID + `"}`,
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("stale cleanup: failed to persist event", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
s.eventBus.Publish(events.Event{
|
||||||
|
Type: events.EventLog,
|
||||||
|
Payload: events.EventLogPayload{
|
||||||
|
ID: evt.ID,
|
||||||
|
Source: "stale_cleanup",
|
||||||
|
Severity: "info",
|
||||||
|
Message: msg,
|
||||||
|
Metadata: evt.Metadata,
|
||||||
|
CreatedAt: evt.CreatedAt,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
@@ -333,6 +333,9 @@ func (d *Deployer) executeDeploy(
|
|||||||
if err := d.store.UpdateInstanceStatus(instanceID, "running"); err != nil {
|
if err := d.store.UpdateInstanceStatus(instanceID, "running"); err != nil {
|
||||||
slog.Warn("update instance status to running", "error", err)
|
slog.Warn("update instance status to running", "error", err)
|
||||||
}
|
}
|
||||||
|
if err := d.store.UpdateLastAliveAt(instanceID); err != nil {
|
||||||
|
slog.Warn("update last_alive_at on deploy", "instance_id", instanceID, "error", err)
|
||||||
|
}
|
||||||
d.publishInstanceStatus(instanceID, project.ID, stage.ID, "running")
|
d.publishInstanceStatus(instanceID, project.ID, stage.ID, "running")
|
||||||
d.logDeploy(deployID, "Container started", "info")
|
d.logDeploy(deployID, "Container started", "info")
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,316 @@
|
|||||||
|
package stale
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"log/slog"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/alexei/docker-watcher/internal/docker"
|
||||||
|
"github.com/alexei/docker-watcher/internal/events"
|
||||||
|
"github.com/alexei/docker-watcher/internal/store"
|
||||||
|
"github.com/robfig/cron/v3"
|
||||||
|
)
|
||||||
|
|
||||||
|
// StaleInstance holds enriched info about a stale container for API responses.
|
||||||
|
type StaleInstance struct {
|
||||||
|
Instance store.Instance `json:"instance"`
|
||||||
|
ProjectName string `json:"project_name"`
|
||||||
|
StageName string `json:"stage_name"`
|
||||||
|
DaysStale int `json:"days_stale"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scanner periodically checks for stale containers that have been
|
||||||
|
// non-running for longer than the configured threshold.
|
||||||
|
type Scanner struct {
|
||||||
|
store *store.Store
|
||||||
|
docker *docker.Client
|
||||||
|
eventBus *events.Bus
|
||||||
|
|
||||||
|
cron *cron.Cron
|
||||||
|
mu sync.Mutex
|
||||||
|
entryID cron.EntryID
|
||||||
|
running bool
|
||||||
|
|
||||||
|
// knownStale tracks instance IDs that have already had a stale event emitted,
|
||||||
|
// to avoid re-emitting warnings for the same instance.
|
||||||
|
knownStale map[string]struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// New creates a new stale container scanner.
|
||||||
|
func New(st *store.Store, dockerClient *docker.Client, eventBus *events.Bus) *Scanner {
|
||||||
|
return &Scanner{
|
||||||
|
store: st,
|
||||||
|
docker: dockerClient,
|
||||||
|
eventBus: eventBus,
|
||||||
|
cron: cron.New(),
|
||||||
|
knownStale: make(map[string]struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start begins the periodic stale container scan with the given interval (e.g., "1h", "30m").
|
||||||
|
// If the scanner is already running, it stops and restarts with the new interval.
|
||||||
|
func (s *Scanner) Start(interval string) error {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
duration, err := time.ParseDuration(interval)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("parse stale scan interval %q: %w", interval, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if s.running {
|
||||||
|
s.cron.Remove(s.entryID)
|
||||||
|
}
|
||||||
|
|
||||||
|
spec := fmt.Sprintf("@every %s", duration.String())
|
||||||
|
entryID, err := s.cron.AddFunc(spec, func() {
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||||
|
defer cancel()
|
||||||
|
if scanErr := s.Scan(ctx); scanErr != nil {
|
||||||
|
slog.Warn("stale scanner: scan error", "error", scanErr)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("schedule stale scanner: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.entryID = entryID
|
||||||
|
if !s.running {
|
||||||
|
s.cron.Start()
|
||||||
|
}
|
||||||
|
s.running = true
|
||||||
|
|
||||||
|
slog.Info("stale scanner started", "interval", duration.String())
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Stop gracefully shuts down the scanner.
|
||||||
|
func (s *Scanner) Stop() {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
if s.running {
|
||||||
|
ctx := s.cron.Stop()
|
||||||
|
<-ctx.Done()
|
||||||
|
s.running = false
|
||||||
|
slog.Info("stale scanner stopped")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Scan performs a single stale-container scan cycle.
|
||||||
|
// It updates last_alive_at for running containers and detects newly stale ones.
|
||||||
|
func (s *Scanner) Scan(ctx context.Context) error {
|
||||||
|
settings, err := s.store.GetSettings()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get settings: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
thresholdDays := settings.StaleThresholdDays
|
||||||
|
if thresholdDays <= 0 {
|
||||||
|
thresholdDays = 7
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get all instances from the store.
|
||||||
|
instances, err := s.store.ListAllInstances()
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("list all instances: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(instances) == 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get all managed Docker containers to check live state.
|
||||||
|
containers, err := s.docker.ListContainers(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("list docker containers: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build a lookup: instance ID -> container state.
|
||||||
|
containerStateByInstanceID := make(map[string]string, len(containers))
|
||||||
|
for _, c := range containers {
|
||||||
|
if c.InstanceID != "" {
|
||||||
|
containerStateByInstanceID[c.InstanceID] = c.State
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now().UTC()
|
||||||
|
currentStaleIDs := make(map[string]struct{})
|
||||||
|
|
||||||
|
for _, inst := range instances {
|
||||||
|
// Skip instances already being cleaned up.
|
||||||
|
if inst.Status == "removing" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
dockerState := containerStateByInstanceID[inst.ID]
|
||||||
|
|
||||||
|
// If the container is running in Docker, update last_alive_at.
|
||||||
|
if dockerState == "running" {
|
||||||
|
if err := s.store.UpdateLastAliveAt(inst.ID); err != nil {
|
||||||
|
slog.Warn("stale scanner: failed to update last_alive_at",
|
||||||
|
"instance_id", inst.ID, "error", err)
|
||||||
|
}
|
||||||
|
// Also sync store status if it was out of date.
|
||||||
|
if inst.Status != "running" {
|
||||||
|
if err := s.store.UpdateInstanceStatus(inst.ID, "running"); err != nil {
|
||||||
|
slog.Warn("stale scanner: failed to sync instance status",
|
||||||
|
"instance_id", inst.ID, "error", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Container is not running. Check if it's stale.
|
||||||
|
if inst.LastAliveAt == "" {
|
||||||
|
// Never been seen running. Use created_at as fallback.
|
||||||
|
inst.LastAliveAt = inst.CreatedAt
|
||||||
|
}
|
||||||
|
|
||||||
|
lastAlive, parseErr := time.Parse("2006-01-02 15:04:05", inst.LastAliveAt)
|
||||||
|
if parseErr != nil {
|
||||||
|
slog.Warn("stale scanner: failed to parse last_alive_at",
|
||||||
|
"instance_id", inst.ID, "last_alive_at", inst.LastAliveAt, "error", parseErr)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
daysSinceAlive := int(now.Sub(lastAlive).Hours() / 24)
|
||||||
|
if daysSinceAlive < thresholdDays {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// This instance is stale.
|
||||||
|
currentStaleIDs[inst.ID] = struct{}{}
|
||||||
|
|
||||||
|
// Emit event only if this is newly detected as stale.
|
||||||
|
if _, alreadyKnown := s.knownStale[inst.ID]; !alreadyKnown {
|
||||||
|
s.emitStaleEvent(inst, daysSinceAlive)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update known stale set: remove IDs that are no longer stale.
|
||||||
|
s.knownStale = currentStaleIDs
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// FindStaleInstances returns all currently stale instances with enriched project/stage info.
|
||||||
|
func (s *Scanner) FindStaleInstances(ctx context.Context) ([]StaleInstance, error) {
|
||||||
|
settings, err := s.store.GetSettings()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("get settings: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
thresholdDays := settings.StaleThresholdDays
|
||||||
|
if thresholdDays <= 0 {
|
||||||
|
thresholdDays = 7
|
||||||
|
}
|
||||||
|
|
||||||
|
instances, err := s.store.ListAllInstances()
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("list all instances: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
containers, err := s.docker.ListContainers(ctx, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("list docker containers: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
containerStateByInstanceID := make(map[string]string, len(containers))
|
||||||
|
for _, c := range containers {
|
||||||
|
if c.InstanceID != "" {
|
||||||
|
containerStateByInstanceID[c.InstanceID] = c.State
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
now := time.Now().UTC()
|
||||||
|
var result []StaleInstance
|
||||||
|
|
||||||
|
for _, inst := range instances {
|
||||||
|
if inst.Status == "removing" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// If Docker says it's running, it's not stale.
|
||||||
|
if containerStateByInstanceID[inst.ID] == "running" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
lastAlive := inst.LastAliveAt
|
||||||
|
if lastAlive == "" {
|
||||||
|
lastAlive = inst.CreatedAt
|
||||||
|
}
|
||||||
|
|
||||||
|
lastAliveTime, parseErr := time.Parse("2006-01-02 15:04:05", lastAlive)
|
||||||
|
if parseErr != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
daysSinceAlive := int(now.Sub(lastAliveTime).Hours() / 24)
|
||||||
|
if daysSinceAlive < thresholdDays {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Look up project and stage names.
|
||||||
|
projectName := inst.ProjectID
|
||||||
|
stageName := inst.StageID
|
||||||
|
if proj, err := s.store.GetProjectByID(inst.ProjectID); err == nil {
|
||||||
|
projectName = proj.Name
|
||||||
|
}
|
||||||
|
if stg, err := s.store.GetStageByID(inst.StageID); err == nil {
|
||||||
|
stageName = stg.Name
|
||||||
|
}
|
||||||
|
|
||||||
|
result = append(result, StaleInstance{
|
||||||
|
Instance: inst,
|
||||||
|
ProjectName: projectName,
|
||||||
|
StageName: stageName,
|
||||||
|
DaysStale: daysSinceAlive,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return result, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// emitStaleEvent publishes a warning event for a newly detected stale container.
|
||||||
|
func (s *Scanner) emitStaleEvent(inst store.Instance, daysStale int) {
|
||||||
|
metadata, _ := json.Marshal(map[string]any{
|
||||||
|
"instance_id": inst.ID,
|
||||||
|
"project_id": inst.ProjectID,
|
||||||
|
"stage_id": inst.StageID,
|
||||||
|
"image_tag": inst.ImageTag,
|
||||||
|
"last_alive_at": inst.LastAliveAt,
|
||||||
|
"days_stale": daysStale,
|
||||||
|
})
|
||||||
|
|
||||||
|
msg := fmt.Sprintf("Container %s (tag: %s) has been non-running for %d days",
|
||||||
|
inst.ID, inst.ImageTag, daysStale)
|
||||||
|
|
||||||
|
// Persist directly to event log.
|
||||||
|
evt, err := s.store.InsertEvent(store.EventLog{
|
||||||
|
Source: "stale_scanner",
|
||||||
|
Severity: "warn",
|
||||||
|
Message: msg,
|
||||||
|
Metadata: string(metadata),
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
slog.Error("stale scanner: failed to persist event", "error", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Publish for SSE clients.
|
||||||
|
s.eventBus.Publish(events.Event{
|
||||||
|
Type: events.EventLog,
|
||||||
|
Payload: events.EventLogPayload{
|
||||||
|
ID: evt.ID,
|
||||||
|
Source: "stale_scanner",
|
||||||
|
Severity: "warn",
|
||||||
|
Message: msg,
|
||||||
|
Metadata: string(metadata),
|
||||||
|
CreatedAt: evt.CreatedAt,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
}
|
||||||
+70
-19
@@ -8,6 +8,20 @@ import (
|
|||||||
"github.com/google/uuid"
|
"github.com/google/uuid"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// instanceColumns is the canonical column list for instance queries.
|
||||||
|
const instanceColumns = `id, stage_id, project_id, container_id, image_tag, subdomain, npm_proxy_id, status, port, last_alive_at, created_at, updated_at`
|
||||||
|
|
||||||
|
// scanInstance scans a row into an Instance struct using the canonical column order.
|
||||||
|
func scanInstance(scanner interface{ Scan(...any) error }) (Instance, error) {
|
||||||
|
var inst Instance
|
||||||
|
err := scanner.Scan(
|
||||||
|
&inst.ID, &inst.StageID, &inst.ProjectID, &inst.ContainerID, &inst.ImageTag,
|
||||||
|
&inst.Subdomain, &inst.NpmProxyID, &inst.Status, &inst.Port,
|
||||||
|
&inst.LastAliveAt, &inst.CreatedAt, &inst.UpdatedAt,
|
||||||
|
)
|
||||||
|
return inst, err
|
||||||
|
}
|
||||||
|
|
||||||
// CreateInstance inserts a new instance record.
|
// CreateInstance inserts a new instance record.
|
||||||
func (s *Store) CreateInstance(inst Instance) (Instance, error) {
|
func (s *Store) CreateInstance(inst Instance) (Instance, error) {
|
||||||
inst.ID = uuid.New().String()
|
inst.ID = uuid.New().String()
|
||||||
@@ -15,10 +29,11 @@ func (s *Store) CreateInstance(inst Instance) (Instance, error) {
|
|||||||
inst.UpdatedAt = inst.CreatedAt
|
inst.UpdatedAt = inst.CreatedAt
|
||||||
|
|
||||||
_, err := s.db.Exec(
|
_, err := s.db.Exec(
|
||||||
`INSERT INTO instances (id, stage_id, project_id, container_id, image_tag, subdomain, npm_proxy_id, status, port, created_at, updated_at)
|
`INSERT INTO instances (`+instanceColumns+`)
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||||
inst.ID, inst.StageID, inst.ProjectID, inst.ContainerID, inst.ImageTag,
|
inst.ID, inst.StageID, inst.ProjectID, inst.ContainerID, inst.ImageTag,
|
||||||
inst.Subdomain, inst.NpmProxyID, inst.Status, inst.Port, inst.CreatedAt, inst.UpdatedAt,
|
inst.Subdomain, inst.NpmProxyID, inst.Status, inst.Port,
|
||||||
|
inst.LastAliveAt, inst.CreatedAt, inst.UpdatedAt,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Instance{}, fmt.Errorf("insert instance: %w", err)
|
return Instance{}, fmt.Errorf("insert instance: %w", err)
|
||||||
@@ -36,10 +51,11 @@ func (s *Store) CreateInstanceWithID(inst Instance) (Instance, error) {
|
|||||||
inst.UpdatedAt = inst.CreatedAt
|
inst.UpdatedAt = inst.CreatedAt
|
||||||
|
|
||||||
_, err := s.db.Exec(
|
_, err := s.db.Exec(
|
||||||
`INSERT INTO instances (id, stage_id, project_id, container_id, image_tag, subdomain, npm_proxy_id, status, port, created_at, updated_at)
|
`INSERT INTO instances (`+instanceColumns+`)
|
||||||
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
||||||
inst.ID, inst.StageID, inst.ProjectID, inst.ContainerID, inst.ImageTag,
|
inst.ID, inst.StageID, inst.ProjectID, inst.ContainerID, inst.ImageTag,
|
||||||
inst.Subdomain, inst.NpmProxyID, inst.Status, inst.Port, inst.CreatedAt, inst.UpdatedAt,
|
inst.Subdomain, inst.NpmProxyID, inst.Status, inst.Port,
|
||||||
|
inst.LastAliveAt, inst.CreatedAt, inst.UpdatedAt,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return Instance{}, fmt.Errorf("insert instance: %w", err)
|
return Instance{}, fmt.Errorf("insert instance: %w", err)
|
||||||
@@ -49,12 +65,9 @@ func (s *Store) CreateInstanceWithID(inst Instance) (Instance, error) {
|
|||||||
|
|
||||||
// GetInstanceByID returns a single instance by its ID.
|
// GetInstanceByID returns a single instance by its ID.
|
||||||
func (s *Store) GetInstanceByID(id string) (Instance, error) {
|
func (s *Store) GetInstanceByID(id string) (Instance, error) {
|
||||||
var inst Instance
|
inst, err := scanInstance(s.db.QueryRow(
|
||||||
err := s.db.QueryRow(
|
`SELECT `+instanceColumns+` FROM instances WHERE id = ?`, id,
|
||||||
`SELECT id, stage_id, project_id, container_id, image_tag, subdomain, npm_proxy_id, status, port, created_at, updated_at
|
))
|
||||||
FROM instances WHERE id = ?`, id,
|
|
||||||
).Scan(&inst.ID, &inst.StageID, &inst.ProjectID, &inst.ContainerID, &inst.ImageTag,
|
|
||||||
&inst.Subdomain, &inst.NpmProxyID, &inst.Status, &inst.Port, &inst.CreatedAt, &inst.UpdatedAt)
|
|
||||||
if errors.Is(err, sql.ErrNoRows) {
|
if errors.Is(err, sql.ErrNoRows) {
|
||||||
return Instance{}, fmt.Errorf("instance %s: %w", id, ErrNotFound)
|
return Instance{}, fmt.Errorf("instance %s: %w", id, ErrNotFound)
|
||||||
}
|
}
|
||||||
@@ -67,8 +80,7 @@ func (s *Store) GetInstanceByID(id string) (Instance, error) {
|
|||||||
// GetInstancesByStageID returns all instances for a given stage.
|
// GetInstancesByStageID returns all instances for a given stage.
|
||||||
func (s *Store) GetInstancesByStageID(stageID string) ([]Instance, error) {
|
func (s *Store) GetInstancesByStageID(stageID string) ([]Instance, error) {
|
||||||
rows, err := s.db.Query(
|
rows, err := s.db.Query(
|
||||||
`SELECT id, stage_id, project_id, container_id, image_tag, subdomain, npm_proxy_id, status, port, created_at, updated_at
|
`SELECT `+instanceColumns+` FROM instances WHERE stage_id = ? ORDER BY created_at DESC`, stageID,
|
||||||
FROM instances WHERE stage_id = ? ORDER BY created_at DESC`, stageID,
|
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("query instances: %w", err)
|
return nil, fmt.Errorf("query instances: %w", err)
|
||||||
@@ -77,9 +89,29 @@ func (s *Store) GetInstancesByStageID(stageID string) ([]Instance, error) {
|
|||||||
|
|
||||||
instances := []Instance{}
|
instances := []Instance{}
|
||||||
for rows.Next() {
|
for rows.Next() {
|
||||||
var inst Instance
|
inst, err := scanInstance(rows)
|
||||||
if err := rows.Scan(&inst.ID, &inst.StageID, &inst.ProjectID, &inst.ContainerID, &inst.ImageTag,
|
if err != nil {
|
||||||
&inst.Subdomain, &inst.NpmProxyID, &inst.Status, &inst.Port, &inst.CreatedAt, &inst.UpdatedAt); err != nil {
|
return nil, fmt.Errorf("scan instance: %w", err)
|
||||||
|
}
|
||||||
|
instances = append(instances, inst)
|
||||||
|
}
|
||||||
|
return instances, rows.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ListAllInstances returns all instances across all stages.
|
||||||
|
func (s *Store) ListAllInstances() ([]Instance, error) {
|
||||||
|
rows, err := s.db.Query(
|
||||||
|
`SELECT ` + instanceColumns + ` FROM instances ORDER BY created_at DESC`,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("query all instances: %w", err)
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
|
||||||
|
instances := []Instance{}
|
||||||
|
for rows.Next() {
|
||||||
|
inst, err := scanInstance(rows)
|
||||||
|
if err != nil {
|
||||||
return nil, fmt.Errorf("scan instance: %w", err)
|
return nil, fmt.Errorf("scan instance: %w", err)
|
||||||
}
|
}
|
||||||
instances = append(instances, inst)
|
instances = append(instances, inst)
|
||||||
@@ -91,10 +123,11 @@ func (s *Store) GetInstancesByStageID(stageID string) ([]Instance, error) {
|
|||||||
func (s *Store) UpdateInstance(inst Instance) error {
|
func (s *Store) UpdateInstance(inst Instance) error {
|
||||||
inst.UpdatedAt = Now()
|
inst.UpdatedAt = Now()
|
||||||
result, err := s.db.Exec(
|
result, err := s.db.Exec(
|
||||||
`UPDATE instances SET stage_id=?, project_id=?, container_id=?, image_tag=?, subdomain=?, npm_proxy_id=?, status=?, port=?, updated_at=?
|
`UPDATE instances SET stage_id=?, project_id=?, container_id=?, image_tag=?, subdomain=?, npm_proxy_id=?, status=?, port=?, last_alive_at=?, updated_at=?
|
||||||
WHERE id=?`,
|
WHERE id=?`,
|
||||||
inst.StageID, inst.ProjectID, inst.ContainerID, inst.ImageTag,
|
inst.StageID, inst.ProjectID, inst.ContainerID, inst.ImageTag,
|
||||||
inst.Subdomain, inst.NpmProxyID, inst.Status, inst.Port, inst.UpdatedAt, inst.ID,
|
inst.Subdomain, inst.NpmProxyID, inst.Status, inst.Port,
|
||||||
|
inst.LastAliveAt, inst.UpdatedAt, inst.ID,
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("update instance: %w", err)
|
return fmt.Errorf("update instance: %w", err)
|
||||||
@@ -123,6 +156,24 @@ func (s *Store) UpdateInstanceStatus(id string, status string) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// UpdateLastAliveAt sets the last_alive_at timestamp for an instance.
|
||||||
|
// Called when an instance is seen running.
|
||||||
|
func (s *Store) UpdateLastAliveAt(id string) error {
|
||||||
|
ts := Now()
|
||||||
|
result, err := s.db.Exec(
|
||||||
|
`UPDATE instances SET last_alive_at=?, updated_at=? WHERE id=?`,
|
||||||
|
ts, ts, id,
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("update last_alive_at: %w", err)
|
||||||
|
}
|
||||||
|
n, _ := result.RowsAffected()
|
||||||
|
if n == 0 {
|
||||||
|
return fmt.Errorf("instance %s: %w", id, ErrNotFound)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// DeleteInstance removes an instance by ID.
|
// DeleteInstance removes an instance by ID.
|
||||||
func (s *Store) DeleteInstance(id string) error {
|
func (s *Store) DeleteInstance(id string) error {
|
||||||
result, err := s.db.Exec(`DELETE FROM instances WHERE id = ?`, id)
|
result, err := s.db.Exec(`DELETE FROM instances WHERE id = ?`, id)
|
||||||
|
|||||||
@@ -71,6 +71,7 @@ type Instance struct {
|
|||||||
NpmProxyID int `json:"npm_proxy_id"`
|
NpmProxyID int `json:"npm_proxy_id"`
|
||||||
Status string `json:"status"` // running, stopped, failed, removing
|
Status string `json:"status"` // running, stopped, failed, removing
|
||||||
Port int `json:"port"`
|
Port int `json:"port"`
|
||||||
|
LastAliveAt string `json:"last_alive_at"`
|
||||||
CreatedAt string `json:"created_at"`
|
CreatedAt string `json:"created_at"`
|
||||||
UpdatedAt string `json:"updated_at"`
|
UpdatedAt string `json:"updated_at"`
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -83,6 +83,8 @@ func (s *Store) runMigrations() error {
|
|||||||
`ALTER TABLE settings ADD COLUMN ssl_certificate_id INTEGER NOT NULL DEFAULT 0`,
|
`ALTER TABLE settings ADD COLUMN ssl_certificate_id INTEGER NOT NULL DEFAULT 0`,
|
||||||
// Add stale_threshold_days to settings (2026-03-30).
|
// Add stale_threshold_days to settings (2026-03-30).
|
||||||
`ALTER TABLE settings ADD COLUMN stale_threshold_days INTEGER NOT NULL DEFAULT 7`,
|
`ALTER TABLE settings ADD COLUMN stale_threshold_days INTEGER NOT NULL DEFAULT 7`,
|
||||||
|
// Add last_alive_at to instances for stale container detection (2026-03-30).
|
||||||
|
`ALTER TABLE instances ADD COLUMN last_alive_at TEXT NOT NULL DEFAULT ''`,
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, m := range migrations {
|
for _, m := range migrations {
|
||||||
|
|||||||
Reference in New Issue
Block a user