package registry import ( "context" "fmt" "log" "sync" "time" "github.com/alexei/docker-watcher/internal/crypto" "github.com/alexei/docker-watcher/internal/store" "github.com/robfig/cron/v3" ) // Poller periodically checks registries for new image tags and triggers // deployments for stages with auto_deploy enabled. type Poller struct { store *store.Store deployer DeployTriggerer encKey [32]byte cron *cron.Cron mu sync.Mutex entryID cron.EntryID running bool } // NewPoller creates a new Poller instance. func NewPoller(st *store.Store, deployer DeployTriggerer, encKey [32]byte) *Poller { return &Poller{ store: st, deployer: deployer, encKey: encKey, cron: cron.New(), } } // Start begins the polling scheduler with the given interval string (e.g., "5m", "1h"). // If the poller is already running, it stops and restarts with the new interval. func (p *Poller) Start(interval string) error { p.mu.Lock() defer p.mu.Unlock() duration, err := time.ParseDuration(interval) if err != nil { return fmt.Errorf("parse polling interval %q: %w", interval, err) } // Stop existing schedule if running. if p.running { p.cron.Remove(p.entryID) } // Convert duration to a cron schedule: @every . spec := fmt.Sprintf("@every %s", duration.String()) entryID, err := p.cron.AddFunc(spec, func() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() if pollErr := p.poll(ctx); pollErr != nil { log.Printf("[poller] poll error: %v", pollErr) } }) if err != nil { return fmt.Errorf("schedule poller: %w", err) } p.entryID = entryID if !p.running { p.cron.Start() } p.running = true log.Printf("[poller] started with interval %s", duration) return nil } // Stop gracefully shuts down the poller. func (p *Poller) Stop() { p.mu.Lock() defer p.mu.Unlock() if p.running { ctx := p.cron.Stop() <-ctx.Done() p.running = false log.Println("[poller] stopped") } } // poll performs a single polling cycle: iterates over all projects and their // stages, checks for new tags, and triggers deploys where appropriate. func (p *Poller) poll(ctx context.Context) error { projects, err := p.store.GetAllProjects() if err != nil { return fmt.Errorf("get projects: %w", err) } for _, project := range projects { if err := p.pollProject(ctx, project); err != nil { log.Printf("[poller] project %s (%s): %v", project.Name, project.ID, err) // Continue polling other projects even if one fails. } } return nil } // pollProject checks all stages of a single project for new tags. func (p *Poller) pollProject(ctx context.Context, project store.Project) error { if project.Registry == "" { return nil } // Look up the registry configuration by name (projects store registry name, not ID). reg, err := p.store.GetRegistryByName(project.Registry) if err != nil { return fmt.Errorf("get registry %s: %w", project.Registry, err) } // Decrypt the registry token. token, err := crypto.Decrypt(p.encKey, reg.Token) if err != nil { // Token might not be encrypted (empty or plaintext). token = reg.Token } // Create a registry client for this registry type. client, err := NewClient(reg.Type, reg.URL, token) if err != nil { return fmt.Errorf("create registry client: %w", err) } // Fetch all available tags for the project image. tags, err := client.ListTags(ctx, project.Image) if err != nil { return fmt.Errorf("list tags for %s: %w", project.Image, err) } // Check each stage of the project. stages, err := p.store.GetStagesByProjectID(project.ID) if err != nil { return fmt.Errorf("get stages for project %s: %w", project.ID, err) } for _, stage := range stages { if err := p.pollStage(ctx, project, stage, tags); err != nil { log.Printf("[poller] project %s stage %s: %v", project.Name, stage.Name, err) } } return nil } // pollStage checks a single stage for new tags and triggers deploy if needed. func (p *Poller) pollStage(ctx context.Context, project store.Project, stage store.Stage, allTags []string) error { // Find the latest tag matching the stage's pattern. latest, err := LatestTag(allTags, stage.TagPattern) if err != nil { return fmt.Errorf("match tags for stage %s: %w", stage.Name, err) } if latest == "" { return nil } // Get the last polled tag for this stage. state, err := p.store.GetPollState(stage.ID) if err != nil { // No poll state yet — this is the first poll for this stage. // Record the current latest tag without triggering a deploy, // so we don't deploy everything on first startup. return p.store.UpsertPollState(store.PollState{ StageID: stage.ID, LastTag: latest, LastPolled: now(), }) } // Update the poll timestamp regardless. defer func() { if err := p.store.UpsertPollState(store.PollState{ StageID: stage.ID, LastTag: latest, LastPolled: now(), }); err != nil { log.Printf("[poller] failed to update poll state for stage %s: %v", stage.ID, err) } }() // If the latest tag hasn't changed, nothing to do. if state.LastTag == latest { return nil } log.Printf("[poller] new tag %q detected for project %s stage %s (was %q)", latest, project.Name, stage.Name, state.LastTag) // Only trigger deploy if auto_deploy is enabled for this stage. if !stage.AutoDeploy { log.Printf("[poller] auto_deploy disabled for stage %s, skipping deploy", stage.Name) return nil } if err := p.deployer.TriggerDeploy(ctx, project.ID, stage.ID, latest); err != nil { return fmt.Errorf("trigger deploy for tag %s: %w", latest, err) } return nil } // now returns the current UTC time as a formatted string. func now() string { return time.Now().UTC().Format("2006-01-02 15:04:05") }