90be636d66
Gitea registry client with tag listing and pattern matching, cron-based polling scheduler with first-poll safety, poll state persistence. DeployTriggerer interface for decoupled deploy triggering.
211 lines
5.7 KiB
Go
211 lines
5.7 KiB
Go
package registry
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/alexei/docker-watcher/internal/crypto"
|
|
"github.com/alexei/docker-watcher/internal/store"
|
|
"github.com/robfig/cron/v3"
|
|
)
|
|
|
|
// Poller periodically checks registries for new image tags and triggers
|
|
// deployments for stages with auto_deploy enabled.
|
|
type Poller struct {
|
|
store *store.Store
|
|
deployer DeployTriggerer
|
|
encKey [32]byte
|
|
cron *cron.Cron
|
|
mu sync.Mutex
|
|
entryID cron.EntryID
|
|
running bool
|
|
}
|
|
|
|
// NewPoller creates a new Poller instance.
|
|
func NewPoller(st *store.Store, deployer DeployTriggerer, encKey [32]byte) *Poller {
|
|
return &Poller{
|
|
store: st,
|
|
deployer: deployer,
|
|
encKey: encKey,
|
|
cron: cron.New(),
|
|
}
|
|
}
|
|
|
|
// Start begins the polling scheduler with the given interval string (e.g., "5m", "1h").
|
|
// If the poller is already running, it stops and restarts with the new interval.
|
|
func (p *Poller) Start(interval string) error {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
duration, err := time.ParseDuration(interval)
|
|
if err != nil {
|
|
return fmt.Errorf("parse polling interval %q: %w", interval, err)
|
|
}
|
|
|
|
// Stop existing schedule if running.
|
|
if p.running {
|
|
p.cron.Remove(p.entryID)
|
|
}
|
|
|
|
// Convert duration to a cron schedule: @every <duration>.
|
|
spec := fmt.Sprintf("@every %s", duration.String())
|
|
entryID, err := p.cron.AddFunc(spec, func() {
|
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
|
defer cancel()
|
|
if pollErr := p.poll(ctx); pollErr != nil {
|
|
log.Printf("[poller] poll error: %v", pollErr)
|
|
}
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("schedule poller: %w", err)
|
|
}
|
|
|
|
p.entryID = entryID
|
|
if !p.running {
|
|
p.cron.Start()
|
|
}
|
|
p.running = true
|
|
|
|
log.Printf("[poller] started with interval %s", duration)
|
|
return nil
|
|
}
|
|
|
|
// Stop gracefully shuts down the poller.
|
|
func (p *Poller) Stop() {
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
if p.running {
|
|
ctx := p.cron.Stop()
|
|
<-ctx.Done()
|
|
p.running = false
|
|
log.Println("[poller] stopped")
|
|
}
|
|
}
|
|
|
|
// poll performs a single polling cycle: iterates over all projects and their
|
|
// stages, checks for new tags, and triggers deploys where appropriate.
|
|
func (p *Poller) poll(ctx context.Context) error {
|
|
projects, err := p.store.GetAllProjects()
|
|
if err != nil {
|
|
return fmt.Errorf("get projects: %w", err)
|
|
}
|
|
|
|
for _, project := range projects {
|
|
if err := p.pollProject(ctx, project); err != nil {
|
|
log.Printf("[poller] project %s (%s): %v", project.Name, project.ID, err)
|
|
// Continue polling other projects even if one fails.
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// pollProject checks all stages of a single project for new tags.
|
|
func (p *Poller) pollProject(ctx context.Context, project store.Project) error {
|
|
if project.Registry == "" {
|
|
return nil
|
|
}
|
|
|
|
// Look up the registry configuration by name (projects store registry name, not ID).
|
|
reg, err := p.store.GetRegistryByName(project.Registry)
|
|
if err != nil {
|
|
return fmt.Errorf("get registry %s: %w", project.Registry, err)
|
|
}
|
|
|
|
// Decrypt the registry token.
|
|
token, err := crypto.Decrypt(p.encKey, reg.Token)
|
|
if err != nil {
|
|
// Token might not be encrypted (empty or plaintext).
|
|
token = reg.Token
|
|
}
|
|
|
|
// Create a registry client for this registry type.
|
|
client, err := NewClient(reg.Type, reg.URL, token)
|
|
if err != nil {
|
|
return fmt.Errorf("create registry client: %w", err)
|
|
}
|
|
|
|
// Fetch all available tags for the project image.
|
|
tags, err := client.ListTags(ctx, project.Image)
|
|
if err != nil {
|
|
return fmt.Errorf("list tags for %s: %w", project.Image, err)
|
|
}
|
|
|
|
// Check each stage of the project.
|
|
stages, err := p.store.GetStagesByProjectID(project.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("get stages for project %s: %w", project.ID, err)
|
|
}
|
|
|
|
for _, stage := range stages {
|
|
if err := p.pollStage(ctx, project, stage, tags); err != nil {
|
|
log.Printf("[poller] project %s stage %s: %v", project.Name, stage.Name, err)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// pollStage checks a single stage for new tags and triggers deploy if needed.
|
|
func (p *Poller) pollStage(ctx context.Context, project store.Project, stage store.Stage, allTags []string) error {
|
|
// Find the latest tag matching the stage's pattern.
|
|
latest, err := LatestTag(allTags, stage.TagPattern)
|
|
if err != nil {
|
|
return fmt.Errorf("match tags for stage %s: %w", stage.Name, err)
|
|
}
|
|
if latest == "" {
|
|
return nil
|
|
}
|
|
|
|
// Get the last polled tag for this stage.
|
|
state, err := p.store.GetPollState(stage.ID)
|
|
if err != nil {
|
|
// No poll state yet — this is the first poll for this stage.
|
|
// Record the current latest tag without triggering a deploy,
|
|
// so we don't deploy everything on first startup.
|
|
return p.store.UpsertPollState(store.PollState{
|
|
StageID: stage.ID,
|
|
LastTag: latest,
|
|
LastPolled: now(),
|
|
})
|
|
}
|
|
|
|
// Update the poll timestamp regardless.
|
|
defer func() {
|
|
if err := p.store.UpsertPollState(store.PollState{
|
|
StageID: stage.ID,
|
|
LastTag: latest,
|
|
LastPolled: now(),
|
|
}); err != nil {
|
|
log.Printf("[poller] failed to update poll state for stage %s: %v", stage.ID, err)
|
|
}
|
|
}()
|
|
|
|
// If the latest tag hasn't changed, nothing to do.
|
|
if state.LastTag == latest {
|
|
return nil
|
|
}
|
|
|
|
log.Printf("[poller] new tag %q detected for project %s stage %s (was %q)",
|
|
latest, project.Name, stage.Name, state.LastTag)
|
|
|
|
// Only trigger deploy if auto_deploy is enabled for this stage.
|
|
if !stage.AutoDeploy {
|
|
log.Printf("[poller] auto_deploy disabled for stage %s, skipping deploy", stage.Name)
|
|
return nil
|
|
}
|
|
|
|
if err := p.deployer.TriggerDeploy(ctx, project.ID, stage.ID, latest); err != nil {
|
|
return fmt.Errorf("trigger deploy for tag %s: %w", latest, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// now returns the current UTC time as a formatted string.
|
|
func now() string {
|
|
return time.Now().UTC().Format("2006-01-02 15:04:05")
|
|
}
|