Files
tiny-forge/internal/registry/poller.go
T
alexei.dolgolyov 791cd4d6af
Build / build (push) Successful in 12m20s
feat: rename Docker Watcher to Tinyforge
Rebrand the project as Tinyforge to reflect its evolution from a Docker
container watcher into a self-hosted mini CI/deployment platform.

Rename covers: Go module path, Docker labels, DB/config filenames,
JWT issuer, Dockerfile binary, docker-compose, CI workflows, frontend
i18n, README with static sites docs, and all code comments.
2026-04-12 21:30:39 +03:00

190 lines
4.8 KiB
Go

package registry
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/alexei/tinyforge/internal/crypto"
"github.com/alexei/tinyforge/internal/store"
"github.com/robfig/cron/v3"
)
// Poller periodically checks registries for new image tags and triggers
// deployments for stages with auto_deploy enabled.
type Poller struct {
store *store.Store
deployer DeployTriggerer
encKey [32]byte
cron *cron.Cron
mu sync.Mutex
entryID cron.EntryID
running bool
}
// NewPoller creates a new Poller instance.
func NewPoller(st *store.Store, deployer DeployTriggerer, encKey [32]byte) *Poller {
return &Poller{
store: st,
deployer: deployer,
encKey: encKey,
cron: cron.New(),
}
}
// Start begins the polling scheduler with the given interval string (e.g., "5m", "1h").
// If the poller is already running, it stops and restarts with the new interval.
func (p *Poller) Start(interval string) error {
p.mu.Lock()
defer p.mu.Unlock()
duration, err := time.ParseDuration(interval)
if err != nil {
return fmt.Errorf("parse polling interval %q: %w", interval, err)
}
// Stop existing schedule if running.
if p.running {
p.cron.Remove(p.entryID)
}
// Convert duration to a cron schedule: @every <duration>.
spec := fmt.Sprintf("@every %s", duration.String())
entryID, err := p.cron.AddFunc(spec, func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
if pollErr := p.poll(ctx); pollErr != nil {
slog.Warn("poller: poll error", "error", pollErr)
}
})
if err != nil {
return fmt.Errorf("schedule poller: %w", err)
}
p.entryID = entryID
if !p.running {
p.cron.Start()
}
p.running = true
slog.Info("poller started", "interval", duration.String())
return nil
}
// Stop gracefully shuts down the poller.
func (p *Poller) Stop() {
p.mu.Lock()
defer p.mu.Unlock()
if p.running {
ctx := p.cron.Stop()
<-ctx.Done()
p.running = false
slog.Info("poller stopped")
}
}
// poll performs a single polling cycle: iterates over all projects and their
// stages, checks for new tags, and triggers deploys where appropriate.
func (p *Poller) poll(ctx context.Context) error {
projects, err := p.store.GetAllProjects()
if err != nil {
return fmt.Errorf("get projects: %w", err)
}
for _, project := range projects {
if err := p.pollProject(ctx, project); err != nil {
slog.Warn("poller: project error", "project", project.Name, "id", project.ID, "error", err)
}
}
return nil
}
// pollProject checks all stages of a single project for new tags.
func (p *Poller) pollProject(ctx context.Context, project store.Project) error {
if project.Registry == "" {
return nil
}
reg, err := p.store.GetRegistryByName(project.Registry)
if err != nil {
return fmt.Errorf("get registry %s: %w", project.Registry, err)
}
token, err := crypto.Decrypt(p.encKey, reg.Token)
if err != nil {
token = reg.Token
}
client, err := NewClient(reg.Type, reg.URL, token)
if err != nil {
return fmt.Errorf("create registry client: %w", err)
}
tags, err := client.ListTags(ctx, project.Image)
if err != nil {
return fmt.Errorf("list tags for %s: %w", project.Image, err)
}
stages, err := p.store.GetStagesByProjectID(project.ID)
if err != nil {
return fmt.Errorf("get stages for project %s: %w", project.ID, err)
}
for _, stage := range stages {
if err := p.pollStage(ctx, project, stage, tags); err != nil {
slog.Warn("poller: stage error", "project", project.Name, "stage", stage.Name, "error", err)
}
}
return nil
}
// pollStage checks a single stage for new tags and triggers deploy if needed.
func (p *Poller) pollStage(ctx context.Context, project store.Project, stage store.Stage, allTags []string) error {
latest, err := LatestTag(allTags, stage.TagPattern)
if err != nil {
return fmt.Errorf("match tags for stage %s: %w", stage.Name, err)
}
if latest == "" {
return nil
}
state, err := p.store.GetPollState(stage.ID)
if err != nil {
return p.store.UpsertPollState(store.PollState{
StageID: stage.ID,
LastTag: latest,
LastPolled: store.Now(),
})
}
defer func() {
if err := p.store.UpsertPollState(store.PollState{
StageID: stage.ID,
LastTag: latest,
LastPolled: store.Now(),
}); err != nil {
slog.Warn("poller: failed to update poll state", "stage_id", stage.ID, "error", err)
}
}()
if state.LastTag == latest {
return nil
}
slog.Info("poller: new tag detected", "tag", latest, "project", project.Name, "stage", stage.Name, "previous", state.LastTag)
if !stage.AutoDeploy {
slog.Info("poller: auto_deploy disabled, skipping", "stage", stage.Name)
return nil
}
if err := p.deployer.TriggerDeploy(ctx, project.ID, stage.ID, latest); err != nil {
return fmt.Errorf("trigger deploy for tag %s: %w", latest, err)
}
return nil
}