package stale import ( "context" "encoding/json" "fmt" "log/slog" "sync" "time" "github.com/alexei/tinyforge/internal/docker" "github.com/alexei/tinyforge/internal/events" "github.com/alexei/tinyforge/internal/store" "github.com/robfig/cron/v3" ) // StaleContainer is a stale container row enriched with the human-readable // labels needed to render the Stale view (workload + role + days). // // JSON shape uses container_id semantics — the frontend type was historically // "Instance"; after the workload refactor it consumes Container fields directly. type StaleContainer struct { Container store.Container `json:"container"` WorkloadID string `json:"workload_id"` WorkloadName string `json:"workload_name"` Role string `json:"role"` DaysStale int `json:"days_stale"` } // Scanner periodically checks for containers that have been non-running for // longer than the configured threshold. type Scanner struct { store *store.Store docker *docker.Client eventBus *events.Bus cron *cron.Cron mu sync.Mutex entryID cron.EntryID running bool // knownStale tracks container row IDs that have already had a stale event // emitted, to avoid re-emitting the same warning on every tick. knownStale map[string]struct{} } // New creates a new stale container scanner. func New(st *store.Store, dockerClient *docker.Client, eventBus *events.Bus) *Scanner { return &Scanner{ store: st, docker: dockerClient, eventBus: eventBus, cron: cron.New(), knownStale: make(map[string]struct{}), } } // Start begins the periodic stale container scan with the given interval (e.g., "1h", "30m"). // If the scanner is already running, it stops and restarts with the new interval. func (s *Scanner) Start(interval string) error { s.mu.Lock() defer s.mu.Unlock() duration, err := time.ParseDuration(interval) if err != nil { return fmt.Errorf("parse stale scan interval %q: %w", interval, err) } if s.running { s.cron.Remove(s.entryID) } spec := fmt.Sprintf("@every %s", duration.String()) entryID, err := s.cron.AddFunc(spec, func() { ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) defer cancel() if scanErr := s.Scan(ctx); scanErr != nil { slog.Warn("stale scanner: scan error", "error", scanErr) } }) if err != nil { return fmt.Errorf("schedule stale scanner: %w", err) } s.entryID = entryID if !s.running { s.cron.Start() } s.running = true slog.Info("stale scanner started", "interval", duration.String()) return nil } // Stop gracefully shuts down the scanner. func (s *Scanner) Stop() { s.mu.Lock() defer s.mu.Unlock() if s.running { ctx := s.cron.Stop() <-ctx.Done() s.running = false slog.Info("stale scanner stopped") } } // Scan performs a single stale-container scan cycle. // Updates last_seen_at for running containers and detects newly stale ones. func (s *Scanner) Scan(ctx context.Context) error { settings, err := s.store.GetSettings() if err != nil { return fmt.Errorf("get settings: %w", err) } thresholdDays := settings.StaleThresholdDays if thresholdDays <= 0 { thresholdDays = 7 } containers, err := s.store.ListContainers(store.ContainerFilter{}) if err != nil { return fmt.Errorf("list containers: %w", err) } if len(containers) == 0 { return nil } // Live state from Docker, indexed by container_id label so we can // reconcile on a single pass. dockerContainers, err := s.docker.ListContainers(ctx, nil) if err != nil { return fmt.Errorf("list docker containers: %w", err) } stateByContainerID := make(map[string]string, len(dockerContainers)) for _, dc := range dockerContainers { stateByContainerID[dc.ID] = dc.State } now := time.Now().UTC() currentStaleIDs := make(map[string]struct{}) for _, c := range containers { if c.State == "removing" { continue } dockerState := stateByContainerID[c.ContainerID] if dockerState == "running" { if err := s.store.UpdateContainerState(c.ID, "running"); err != nil { slog.Warn("stale scanner: failed to update state", "id", c.ID, "error", err) } continue } // Container is not running. Check staleness against last_seen_at, // falling back to created_at if it never came up. ref := c.LastSeenAt if ref == "" { ref = c.CreatedAt } lastAlive, parseErr := time.Parse("2006-01-02 15:04:05", ref) if parseErr != nil { slog.Warn("stale scanner: failed to parse last_seen_at", "id", c.ID, "ref", ref, "error", parseErr) continue } daysSinceAlive := int(now.Sub(lastAlive).Hours() / 24) if daysSinceAlive < thresholdDays { continue } currentStaleIDs[c.ID] = struct{}{} if _, alreadyKnown := s.knownStale[c.ID]; !alreadyKnown { s.emitStaleEvent(c, daysSinceAlive) } } s.knownStale = currentStaleIDs return nil } // FindStaleContainers returns all currently stale containers enriched with // workload + role labels for rendering. func (s *Scanner) FindStaleContainers(ctx context.Context) ([]StaleContainer, error) { settings, err := s.store.GetSettings() if err != nil { return nil, fmt.Errorf("get settings: %w", err) } thresholdDays := settings.StaleThresholdDays if thresholdDays <= 0 { thresholdDays = 7 } containers, err := s.store.ListContainers(store.ContainerFilter{}) if err != nil { return nil, fmt.Errorf("list containers: %w", err) } dockerContainers, err := s.docker.ListContainers(ctx, nil) if err != nil { // Docker unavailable — fall back to store-only detection. slog.Warn("stale scanner: docker unavailable, using store status only", "error", err) dockerContainers = nil } stateByContainerID := make(map[string]string, len(dockerContainers)) for _, dc := range dockerContainers { stateByContainerID[dc.ID] = dc.State } // Pre-load workload names so each stale row carries a friendly identifier. workloads, _ := s.store.ListWorkloads("") workloadNameByID := make(map[string]string, len(workloads)) for _, w := range workloads { workloadNameByID[w.ID] = w.Name } now := time.Now().UTC() var result []StaleContainer for _, c := range containers { if c.State == "removing" { continue } if stateByContainerID[c.ContainerID] == "running" { continue } ref := c.LastSeenAt if ref == "" { ref = c.CreatedAt } lastAliveTime, parseErr := time.Parse("2006-01-02 15:04:05", ref) if parseErr != nil { continue } daysSinceAlive := int(now.Sub(lastAliveTime).Hours() / 24) if daysSinceAlive < thresholdDays { continue } name := workloadNameByID[c.WorkloadID] if name == "" { name = c.WorkloadID } result = append(result, StaleContainer{ Container: c, WorkloadID: c.WorkloadID, WorkloadName: name, Role: c.Role, DaysStale: daysSinceAlive, }) } return result, nil } // emitStaleEvent publishes a warning event for a newly detected stale container. func (s *Scanner) emitStaleEvent(c store.Container, daysStale int) { metadata, _ := json.Marshal(map[string]any{ "container_id": c.ID, "workload_id": c.WorkloadID, "workload_kind": c.WorkloadKind, "role": c.Role, "image_tag": c.ImageTag, "last_seen_at": c.LastSeenAt, "days_stale": daysStale, }) msg := fmt.Sprintf("Container %s (tag: %s) has been non-running for %d days", c.ID, c.ImageTag, daysStale) evt, err := s.store.InsertEvent(store.EventLog{ Source: "stale_scanner", Severity: "warn", Message: msg, Metadata: string(metadata), }) if err != nil { slog.Error("stale scanner: failed to persist event", "error", err) return } s.eventBus.Publish(events.Event{ Type: events.EventLog, Payload: events.EventLogPayload{ ID: evt.ID, Source: "stale_scanner", Severity: "warn", Message: msg, Metadata: string(metadata), CreatedAt: evt.CreatedAt, }, }) }