Files
tiny-forge/internal/registry/poller.go
T
alexei.dolgolyov be6ad15efc fix: comprehensive security, performance, and quality hardening
Security: apply AdminOnly middleware to mutating routes, require
ENCRYPTION_KEY and ADMIN_PASSWORD (no insecure defaults), restrict
CORS to same-origin, fix OIDC token delivery via cookie instead of
URL query param, add rate limiting on login, add MaxBytesReader,
validate volume paths against traversal, add security headers,
validate user roles, add Secure flag to OIDC cookie.

Performance: set SQLite MaxOpenConns(1) to prevent SQLITE_BUSY,
add FK indexes on 8 columns, track notifier goroutines with
WaitGroup for graceful shutdown, use GetRegistryByName instead of
GetAllRegistries in deployer, pass basePath param to avoid redundant
settings query, return empty slices from store to remove reflection.

Quality: refactor TriggerDeploy to delegate to runDeploy (~100 lines
removed), consolidate duplicated utilities (extractPort, boolToInt,
now, isTerminalStatus) into shared exports, migrate all log.Printf
to slog structured logging, use consistent webhook response envelope,
remove dead code (parseEnvVars, duplicate auth types).

UX: clean up NPM proxy on instance removal via API, add README with
quickstart guide, add .env.example, require ADMIN_PASSWORD in
docker-compose, document staging-net prerequisite.
2026-03-29 12:49:24 +03:00

190 lines
4.8 KiB
Go

package registry
import (
"context"
"fmt"
"log/slog"
"sync"
"time"
"github.com/alexei/docker-watcher/internal/crypto"
"github.com/alexei/docker-watcher/internal/store"
"github.com/robfig/cron/v3"
)
// Poller periodically checks registries for new image tags and triggers
// deployments for stages with auto_deploy enabled.
type Poller struct {
store *store.Store
deployer DeployTriggerer
encKey [32]byte
cron *cron.Cron
mu sync.Mutex
entryID cron.EntryID
running bool
}
// NewPoller creates a new Poller instance.
func NewPoller(st *store.Store, deployer DeployTriggerer, encKey [32]byte) *Poller {
return &Poller{
store: st,
deployer: deployer,
encKey: encKey,
cron: cron.New(),
}
}
// Start begins the polling scheduler with the given interval string (e.g., "5m", "1h").
// If the poller is already running, it stops and restarts with the new interval.
func (p *Poller) Start(interval string) error {
p.mu.Lock()
defer p.mu.Unlock()
duration, err := time.ParseDuration(interval)
if err != nil {
return fmt.Errorf("parse polling interval %q: %w", interval, err)
}
// Stop existing schedule if running.
if p.running {
p.cron.Remove(p.entryID)
}
// Convert duration to a cron schedule: @every <duration>.
spec := fmt.Sprintf("@every %s", duration.String())
entryID, err := p.cron.AddFunc(spec, func() {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
defer cancel()
if pollErr := p.poll(ctx); pollErr != nil {
slog.Warn("poller: poll error", "error", pollErr)
}
})
if err != nil {
return fmt.Errorf("schedule poller: %w", err)
}
p.entryID = entryID
if !p.running {
p.cron.Start()
}
p.running = true
slog.Info("poller started", "interval", duration.String())
return nil
}
// Stop gracefully shuts down the poller.
func (p *Poller) Stop() {
p.mu.Lock()
defer p.mu.Unlock()
if p.running {
ctx := p.cron.Stop()
<-ctx.Done()
p.running = false
slog.Info("poller stopped")
}
}
// poll performs a single polling cycle: iterates over all projects and their
// stages, checks for new tags, and triggers deploys where appropriate.
func (p *Poller) poll(ctx context.Context) error {
projects, err := p.store.GetAllProjects()
if err != nil {
return fmt.Errorf("get projects: %w", err)
}
for _, project := range projects {
if err := p.pollProject(ctx, project); err != nil {
slog.Warn("poller: project error", "project", project.Name, "id", project.ID, "error", err)
}
}
return nil
}
// pollProject checks all stages of a single project for new tags.
func (p *Poller) pollProject(ctx context.Context, project store.Project) error {
if project.Registry == "" {
return nil
}
reg, err := p.store.GetRegistryByName(project.Registry)
if err != nil {
return fmt.Errorf("get registry %s: %w", project.Registry, err)
}
token, err := crypto.Decrypt(p.encKey, reg.Token)
if err != nil {
token = reg.Token
}
client, err := NewClient(reg.Type, reg.URL, token)
if err != nil {
return fmt.Errorf("create registry client: %w", err)
}
tags, err := client.ListTags(ctx, project.Image)
if err != nil {
return fmt.Errorf("list tags for %s: %w", project.Image, err)
}
stages, err := p.store.GetStagesByProjectID(project.ID)
if err != nil {
return fmt.Errorf("get stages for project %s: %w", project.ID, err)
}
for _, stage := range stages {
if err := p.pollStage(ctx, project, stage, tags); err != nil {
slog.Warn("poller: stage error", "project", project.Name, "stage", stage.Name, "error", err)
}
}
return nil
}
// pollStage checks a single stage for new tags and triggers deploy if needed.
func (p *Poller) pollStage(ctx context.Context, project store.Project, stage store.Stage, allTags []string) error {
latest, err := LatestTag(allTags, stage.TagPattern)
if err != nil {
return fmt.Errorf("match tags for stage %s: %w", stage.Name, err)
}
if latest == "" {
return nil
}
state, err := p.store.GetPollState(stage.ID)
if err != nil {
return p.store.UpsertPollState(store.PollState{
StageID: stage.ID,
LastTag: latest,
LastPolled: store.Now(),
})
}
defer func() {
if err := p.store.UpsertPollState(store.PollState{
StageID: stage.ID,
LastTag: latest,
LastPolled: store.Now(),
}); err != nil {
slog.Warn("poller: failed to update poll state", "stage_id", stage.ID, "error", err)
}
}()
if state.LastTag == latest {
return nil
}
slog.Info("poller: new tag detected", "tag", latest, "project", project.Name, "stage", stage.Name, "previous", state.LastTag)
if !stage.AutoDeploy {
slog.Info("poller: auto_deploy disabled, skipping", "stage", stage.Name)
return nil
}
if err := p.deployer.TriggerDeploy(ctx, project.ID, stage.ID, latest); err != nil {
return fmt.Errorf("trigger deploy for tag %s: %w", latest, err)
}
return nil
}