e0a648fb0c
Critical fixes: - Fix StaleContainer frontend type to match nested backend response shape - Guard ContainerID[:12] slice against empty/short IDs in ListAllProxies High-priority fixes: - Support comma-separated severity/source in event log filtering (IN clause) - Eliminate N+1 queries in ListAllProxies and FindStaleInstances (pre-load maps) - Stop leaking internal error messages to API clients (use slog + generic msgs)
331 lines
8.5 KiB
Go
331 lines
8.5 KiB
Go
package stale
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log/slog"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/alexei/docker-watcher/internal/docker"
|
|
"github.com/alexei/docker-watcher/internal/events"
|
|
"github.com/alexei/docker-watcher/internal/store"
|
|
"github.com/robfig/cron/v3"
|
|
)
|
|
|
|
// StaleInstance holds enriched info about a stale container for API responses.
|
|
type StaleInstance struct {
|
|
Instance store.Instance `json:"instance"`
|
|
ProjectName string `json:"project_name"`
|
|
StageName string `json:"stage_name"`
|
|
DaysStale int `json:"days_stale"`
|
|
}
|
|
|
|
// Scanner periodically checks for stale containers that have been
|
|
// non-running for longer than the configured threshold.
|
|
type Scanner struct {
|
|
store *store.Store
|
|
docker *docker.Client
|
|
eventBus *events.Bus
|
|
|
|
cron *cron.Cron
|
|
mu sync.Mutex
|
|
entryID cron.EntryID
|
|
running bool
|
|
|
|
// knownStale tracks instance IDs that have already had a stale event emitted,
|
|
// to avoid re-emitting warnings for the same instance.
|
|
knownStale map[string]struct{}
|
|
}
|
|
|
|
// New creates a new stale container scanner.
|
|
func New(st *store.Store, dockerClient *docker.Client, eventBus *events.Bus) *Scanner {
|
|
return &Scanner{
|
|
store: st,
|
|
docker: dockerClient,
|
|
eventBus: eventBus,
|
|
cron: cron.New(),
|
|
knownStale: make(map[string]struct{}),
|
|
}
|
|
}
|
|
|
|
// Start begins the periodic stale container scan with the given interval (e.g., "1h", "30m").
|
|
// If the scanner is already running, it stops and restarts with the new interval.
|
|
func (s *Scanner) Start(interval string) error {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
duration, err := time.ParseDuration(interval)
|
|
if err != nil {
|
|
return fmt.Errorf("parse stale scan interval %q: %w", interval, err)
|
|
}
|
|
|
|
if s.running {
|
|
s.cron.Remove(s.entryID)
|
|
}
|
|
|
|
spec := fmt.Sprintf("@every %s", duration.String())
|
|
entryID, err := s.cron.AddFunc(spec, func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
|
defer cancel()
|
|
if scanErr := s.Scan(ctx); scanErr != nil {
|
|
slog.Warn("stale scanner: scan error", "error", scanErr)
|
|
}
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("schedule stale scanner: %w", err)
|
|
}
|
|
|
|
s.entryID = entryID
|
|
if !s.running {
|
|
s.cron.Start()
|
|
}
|
|
s.running = true
|
|
|
|
slog.Info("stale scanner started", "interval", duration.String())
|
|
return nil
|
|
}
|
|
|
|
// Stop gracefully shuts down the scanner.
|
|
func (s *Scanner) Stop() {
|
|
s.mu.Lock()
|
|
defer s.mu.Unlock()
|
|
|
|
if s.running {
|
|
ctx := s.cron.Stop()
|
|
<-ctx.Done()
|
|
s.running = false
|
|
slog.Info("stale scanner stopped")
|
|
}
|
|
}
|
|
|
|
// Scan performs a single stale-container scan cycle.
|
|
// It updates last_alive_at for running containers and detects newly stale ones.
|
|
func (s *Scanner) Scan(ctx context.Context) error {
|
|
settings, err := s.store.GetSettings()
|
|
if err != nil {
|
|
return fmt.Errorf("get settings: %w", err)
|
|
}
|
|
|
|
thresholdDays := settings.StaleThresholdDays
|
|
if thresholdDays <= 0 {
|
|
thresholdDays = 7
|
|
}
|
|
|
|
// Get all instances from the store.
|
|
instances, err := s.store.ListAllInstances()
|
|
if err != nil {
|
|
return fmt.Errorf("list all instances: %w", err)
|
|
}
|
|
|
|
if len(instances) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Get all managed Docker containers to check live state.
|
|
containers, err := s.docker.ListContainers(ctx, nil)
|
|
if err != nil {
|
|
return fmt.Errorf("list docker containers: %w", err)
|
|
}
|
|
|
|
// Build a lookup: instance ID -> container state.
|
|
containerStateByInstanceID := make(map[string]string, len(containers))
|
|
for _, c := range containers {
|
|
if c.InstanceID != "" {
|
|
containerStateByInstanceID[c.InstanceID] = c.State
|
|
}
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
currentStaleIDs := make(map[string]struct{})
|
|
|
|
for _, inst := range instances {
|
|
// Skip instances already being cleaned up.
|
|
if inst.Status == "removing" {
|
|
continue
|
|
}
|
|
|
|
dockerState := containerStateByInstanceID[inst.ID]
|
|
|
|
// If the container is running in Docker, update last_alive_at.
|
|
if dockerState == "running" {
|
|
if err := s.store.UpdateLastAliveAt(inst.ID); err != nil {
|
|
slog.Warn("stale scanner: failed to update last_alive_at",
|
|
"instance_id", inst.ID, "error", err)
|
|
}
|
|
// Also sync store status if it was out of date.
|
|
if inst.Status != "running" {
|
|
if err := s.store.UpdateInstanceStatus(inst.ID, "running"); err != nil {
|
|
slog.Warn("stale scanner: failed to sync instance status",
|
|
"instance_id", inst.ID, "error", err)
|
|
}
|
|
}
|
|
continue
|
|
}
|
|
|
|
// Container is not running. Check if it's stale.
|
|
if inst.LastAliveAt == "" {
|
|
// Never been seen running. Use created_at as fallback.
|
|
inst.LastAliveAt = inst.CreatedAt
|
|
}
|
|
|
|
lastAlive, parseErr := time.Parse("2006-01-02 15:04:05", inst.LastAliveAt)
|
|
if parseErr != nil {
|
|
slog.Warn("stale scanner: failed to parse last_alive_at",
|
|
"instance_id", inst.ID, "last_alive_at", inst.LastAliveAt, "error", parseErr)
|
|
continue
|
|
}
|
|
|
|
daysSinceAlive := int(now.Sub(lastAlive).Hours() / 24)
|
|
if daysSinceAlive < thresholdDays {
|
|
continue
|
|
}
|
|
|
|
// This instance is stale.
|
|
currentStaleIDs[inst.ID] = struct{}{}
|
|
|
|
// Emit event only if this is newly detected as stale.
|
|
if _, alreadyKnown := s.knownStale[inst.ID]; !alreadyKnown {
|
|
s.emitStaleEvent(inst, daysSinceAlive)
|
|
}
|
|
}
|
|
|
|
// Update known stale set: remove IDs that are no longer stale.
|
|
s.knownStale = currentStaleIDs
|
|
|
|
return nil
|
|
}
|
|
|
|
// FindStaleInstances returns all currently stale instances with enriched project/stage info.
|
|
func (s *Scanner) FindStaleInstances(ctx context.Context) ([]StaleInstance, error) {
|
|
settings, err := s.store.GetSettings()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("get settings: %w", err)
|
|
}
|
|
|
|
thresholdDays := settings.StaleThresholdDays
|
|
if thresholdDays <= 0 {
|
|
thresholdDays = 7
|
|
}
|
|
|
|
instances, err := s.store.ListAllInstances()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list all instances: %w", err)
|
|
}
|
|
|
|
containers, err := s.docker.ListContainers(ctx, nil)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("list docker containers: %w", err)
|
|
}
|
|
|
|
containerStateByInstanceID := make(map[string]string, len(containers))
|
|
for _, c := range containers {
|
|
if c.InstanceID != "" {
|
|
containerStateByInstanceID[c.InstanceID] = c.State
|
|
}
|
|
}
|
|
|
|
// Pre-load project and stage names to avoid N+1 queries.
|
|
allProjects, _ := s.store.GetAllProjects()
|
|
projectNames := make(map[string]string, len(allProjects))
|
|
for _, p := range allProjects {
|
|
projectNames[p.ID] = p.Name
|
|
}
|
|
stageNames := make(map[string]string)
|
|
for _, p := range allProjects {
|
|
stages, _ := s.store.GetStagesByProjectID(p.ID)
|
|
for _, st := range stages {
|
|
stageNames[st.ID] = st.Name
|
|
}
|
|
}
|
|
|
|
now := time.Now().UTC()
|
|
var result []StaleInstance
|
|
|
|
for _, inst := range instances {
|
|
if inst.Status == "removing" {
|
|
continue
|
|
}
|
|
|
|
// If Docker says it's running, it's not stale.
|
|
if containerStateByInstanceID[inst.ID] == "running" {
|
|
continue
|
|
}
|
|
|
|
lastAlive := inst.LastAliveAt
|
|
if lastAlive == "" {
|
|
lastAlive = inst.CreatedAt
|
|
}
|
|
|
|
lastAliveTime, parseErr := time.Parse("2006-01-02 15:04:05", lastAlive)
|
|
if parseErr != nil {
|
|
continue
|
|
}
|
|
|
|
daysSinceAlive := int(now.Sub(lastAliveTime).Hours() / 24)
|
|
if daysSinceAlive < thresholdDays {
|
|
continue
|
|
}
|
|
|
|
// Look up project and stage names from pre-loaded maps.
|
|
projectName := projectNames[inst.ProjectID]
|
|
if projectName == "" {
|
|
projectName = inst.ProjectID
|
|
}
|
|
stageName := stageNames[inst.StageID]
|
|
if stageName == "" {
|
|
stageName = inst.StageID
|
|
}
|
|
|
|
result = append(result, StaleInstance{
|
|
Instance: inst,
|
|
ProjectName: projectName,
|
|
StageName: stageName,
|
|
DaysStale: daysSinceAlive,
|
|
})
|
|
}
|
|
|
|
return result, nil
|
|
}
|
|
|
|
// emitStaleEvent publishes a warning event for a newly detected stale container.
|
|
func (s *Scanner) emitStaleEvent(inst store.Instance, daysStale int) {
|
|
metadata, _ := json.Marshal(map[string]any{
|
|
"instance_id": inst.ID,
|
|
"project_id": inst.ProjectID,
|
|
"stage_id": inst.StageID,
|
|
"image_tag": inst.ImageTag,
|
|
"last_alive_at": inst.LastAliveAt,
|
|
"days_stale": daysStale,
|
|
})
|
|
|
|
msg := fmt.Sprintf("Container %s (tag: %s) has been non-running for %d days",
|
|
inst.ID, inst.ImageTag, daysStale)
|
|
|
|
// Persist directly to event log.
|
|
evt, err := s.store.InsertEvent(store.EventLog{
|
|
Source: "stale_scanner",
|
|
Severity: "warn",
|
|
Message: msg,
|
|
Metadata: string(metadata),
|
|
})
|
|
if err != nil {
|
|
slog.Error("stale scanner: failed to persist event", "error", err)
|
|
return
|
|
}
|
|
|
|
// Publish for SSE clients.
|
|
s.eventBus.Publish(events.Event{
|
|
Type: events.EventLog,
|
|
Payload: events.EventLogPayload{
|
|
ID: evt.ID,
|
|
Source: "stale_scanner",
|
|
Severity: "warn",
|
|
Message: msg,
|
|
Metadata: string(metadata),
|
|
CreatedAt: evt.CreatedAt,
|
|
},
|
|
})
|
|
}
|