package registry import ( "context" "fmt" "log/slog" "sync" "time" "github.com/alexei/tinyforge/internal/crypto" "github.com/alexei/tinyforge/internal/store" "github.com/robfig/cron/v3" ) // Poller periodically checks registries for new image tags and triggers // deployments for stages with auto_deploy enabled. type Poller struct { store *store.Store deployer DeployTriggerer encKey [32]byte cron *cron.Cron mu sync.Mutex entryID cron.EntryID running bool } // NewPoller creates a new Poller instance. func NewPoller(st *store.Store, deployer DeployTriggerer, encKey [32]byte) *Poller { return &Poller{ store: st, deployer: deployer, encKey: encKey, cron: cron.New(), } } // Start begins the polling scheduler with the given interval string (e.g., "5m", "1h"). // If the poller is already running, it stops and restarts with the new interval. func (p *Poller) Start(interval string) error { p.mu.Lock() defer p.mu.Unlock() duration, err := time.ParseDuration(interval) if err != nil { return fmt.Errorf("parse polling interval %q: %w", interval, err) } // Stop existing schedule if running. if p.running { p.cron.Remove(p.entryID) } // Convert duration to a cron schedule: @every . spec := fmt.Sprintf("@every %s", duration.String()) entryID, err := p.cron.AddFunc(spec, func() { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) defer cancel() if pollErr := p.poll(ctx); pollErr != nil { slog.Warn("poller: poll error", "error", pollErr) } }) if err != nil { return fmt.Errorf("schedule poller: %w", err) } p.entryID = entryID if !p.running { p.cron.Start() } p.running = true slog.Info("poller started", "interval", duration.String()) return nil } // Stop gracefully shuts down the poller. func (p *Poller) Stop() { p.mu.Lock() defer p.mu.Unlock() if p.running { ctx := p.cron.Stop() <-ctx.Done() p.running = false slog.Info("poller stopped") } } // poll performs a single polling cycle: iterates over all projects and their // stages, checks for new tags, and triggers deploys where appropriate. func (p *Poller) poll(ctx context.Context) error { projects, err := p.store.GetAllProjects() if err != nil { return fmt.Errorf("get projects: %w", err) } for _, project := range projects { if err := p.pollProject(ctx, project); err != nil { slog.Warn("poller: project error", "project", project.Name, "id", project.ID, "error", err) } } return nil } // pollProject checks all stages of a single project for new tags. func (p *Poller) pollProject(ctx context.Context, project store.Project) error { if project.Registry == "" { return nil } reg, err := p.store.GetRegistryByName(project.Registry) if err != nil { return fmt.Errorf("get registry %s: %w", project.Registry, err) } token, err := crypto.Decrypt(p.encKey, reg.Token) if err != nil { token = reg.Token } client, err := NewClient(reg.Type, reg.URL, token) if err != nil { return fmt.Errorf("create registry client: %w", err) } tags, err := client.ListTags(ctx, project.Image) if err != nil { return fmt.Errorf("list tags for %s: %w", project.Image, err) } stages, err := p.store.GetStagesByProjectID(project.ID) if err != nil { return fmt.Errorf("get stages for project %s: %w", project.ID, err) } for _, stage := range stages { if err := p.pollStage(ctx, project, stage, tags); err != nil { slog.Warn("poller: stage error", "project", project.Name, "stage", stage.Name, "error", err) } } return nil } // pollStage checks a single stage for new tags and triggers deploy if needed. func (p *Poller) pollStage(ctx context.Context, project store.Project, stage store.Stage, allTags []string) error { latest, err := LatestTag(allTags, stage.TagPattern) if err != nil { return fmt.Errorf("match tags for stage %s: %w", stage.Name, err) } if latest == "" { return nil } state, err := p.store.GetPollState(stage.ID) if err != nil { return p.store.UpsertPollState(store.PollState{ StageID: stage.ID, LastTag: latest, LastPolled: store.Now(), }) } defer func() { if err := p.store.UpsertPollState(store.PollState{ StageID: stage.ID, LastTag: latest, LastPolled: store.Now(), }); err != nil { slog.Warn("poller: failed to update poll state", "stage_id", stage.ID, "error", err) } }() if state.LastTag == latest { return nil } slog.Info("poller: new tag detected", "tag", latest, "project", project.Name, "stage", stage.Name, "previous", state.LastTag) if !stage.AutoDeploy { slog.Info("poller: auto_deploy disabled, skipping", "stage", stage.Name) return nil } if err := p.deployer.TriggerDeploy(ctx, project.ID, stage.ID, latest); err != nil { return fmt.Errorf("trigger deploy for tag %s: %w", latest, err) } return nil }