feat(docker-watcher): phase 5 - registry client & poller
Gitea registry client with tag listing and pattern matching, cron-based polling scheduler with first-poll safety, poll state persistence. DeployTriggerer interface for decoupled deploy triggering.
This commit is contained in:
@@ -0,0 +1,216 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
)
|
||||
|
||||
// giteaPackageVersion represents a single version entry from the Gitea
|
||||
// packages API response.
|
||||
type giteaPackageVersion struct {
|
||||
ID int64 `json:"id"`
|
||||
Version string `json:"version"`
|
||||
Creator struct {
|
||||
Login string `json:"login"`
|
||||
} `json:"creator"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
}
|
||||
|
||||
// GiteaClient implements Client for Gitea container registries.
|
||||
type GiteaClient struct {
|
||||
baseURL string
|
||||
token string
|
||||
httpClient *http.Client
|
||||
}
|
||||
|
||||
// NewGiteaClient creates a new Gitea registry client.
|
||||
// baseURL should be the Gitea instance URL (e.g., "https://git.example.com").
|
||||
// token is a personal access token with package read permissions.
|
||||
func NewGiteaClient(baseURL, token string) *GiteaClient {
|
||||
return &GiteaClient{
|
||||
baseURL: strings.TrimRight(baseURL, "/"),
|
||||
token: token,
|
||||
httpClient: &http.Client{
|
||||
Timeout: 30 * time.Second,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// ListTags returns all available tags for the given container image.
|
||||
// The image should be in the format "owner/package-name" or
|
||||
// "registry-host/owner/package-name" (the registry host prefix is stripped).
|
||||
func (c *GiteaClient) ListTags(ctx context.Context, image string) ([]string, error) {
|
||||
owner, pkg := parseImage(image)
|
||||
if owner == "" || pkg == "" {
|
||||
return nil, fmt.Errorf("invalid image format %q: expected owner/package", image)
|
||||
}
|
||||
|
||||
versions, err := c.listPackageVersions(ctx, owner, pkg)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list tags for %s/%s: %w", owner, pkg, err)
|
||||
}
|
||||
|
||||
tags := make([]string, 0, len(versions))
|
||||
for _, v := range versions {
|
||||
tags = append(tags, v.Version)
|
||||
}
|
||||
return tags, nil
|
||||
}
|
||||
|
||||
// GetLatestTag returns the most recently created tag matching the given glob
|
||||
// pattern. Returns empty string if no tags match.
|
||||
func (c *GiteaClient) GetLatestTag(ctx context.Context, image string, pattern string) (string, error) {
|
||||
tags, err := c.ListTags(ctx, image)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return LatestTag(tags, pattern)
|
||||
}
|
||||
|
||||
// listPackageVersions fetches all container package versions from the Gitea API.
|
||||
// Endpoint: GET /api/v1/packages/{owner}?type=container&q={package}
|
||||
// Gitea paginates results; this function fetches all pages.
|
||||
func (c *GiteaClient) listPackageVersions(ctx context.Context, owner, pkg string) ([]giteaPackageVersion, error) {
|
||||
var allVersions []giteaPackageVersion
|
||||
page := 1
|
||||
limit := 50
|
||||
|
||||
for {
|
||||
url := fmt.Sprintf("%s/api/v1/packages/%s?type=container&q=%s&page=%d&limit=%d",
|
||||
c.baseURL, owner, pkg, page, limit)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create request: %w", err)
|
||||
}
|
||||
|
||||
if c.token != "" {
|
||||
req.Header.Set("Authorization", "token "+c.token)
|
||||
}
|
||||
req.Header.Set("Accept", "application/json")
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("execute request: %w", err)
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read response body: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("unexpected status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var packages []giteaPackageListEntry
|
||||
if err := json.Unmarshal(body, &packages); err != nil {
|
||||
return nil, fmt.Errorf("decode package list: %w", err)
|
||||
}
|
||||
|
||||
// Filter for exact package name match and collect versions.
|
||||
for _, p := range packages {
|
||||
if p.Name == pkg {
|
||||
versions, err := c.fetchPackageVersions(ctx, owner, pkg)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return versions, nil
|
||||
}
|
||||
}
|
||||
|
||||
// If we got fewer results than the limit, we've reached the last page.
|
||||
if len(packages) < limit {
|
||||
break
|
||||
}
|
||||
page++
|
||||
}
|
||||
|
||||
return allVersions, nil
|
||||
}
|
||||
|
||||
// giteaPackageListEntry represents a package in the Gitea packages list response.
|
||||
type giteaPackageListEntry struct {
|
||||
ID int64 `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Type string `json:"type"`
|
||||
Version string `json:"version"`
|
||||
}
|
||||
|
||||
// fetchPackageVersions fetches all versions of a specific container package.
|
||||
// Endpoint: GET /api/v1/packages/{owner}/container/{name}
|
||||
func (c *GiteaClient) fetchPackageVersions(ctx context.Context, owner, pkg string) ([]giteaPackageVersion, error) {
|
||||
var allVersions []giteaPackageVersion
|
||||
page := 1
|
||||
limit := 50
|
||||
|
||||
for {
|
||||
url := fmt.Sprintf("%s/api/v1/packages/%s/container/%s?page=%d&limit=%d",
|
||||
c.baseURL, owner, pkg, page, limit)
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodGet, url, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create request: %w", err)
|
||||
}
|
||||
|
||||
if c.token != "" {
|
||||
req.Header.Set("Authorization", "token "+c.token)
|
||||
}
|
||||
req.Header.Set("Accept", "application/json")
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("execute request: %w", err)
|
||||
}
|
||||
|
||||
body, err := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("read response body: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("unexpected status %d: %s", resp.StatusCode, string(body))
|
||||
}
|
||||
|
||||
var versions []giteaPackageVersion
|
||||
if err := json.Unmarshal(body, &versions); err != nil {
|
||||
return nil, fmt.Errorf("decode versions: %w", err)
|
||||
}
|
||||
|
||||
allVersions = append(allVersions, versions...)
|
||||
|
||||
if len(versions) < limit {
|
||||
break
|
||||
}
|
||||
page++
|
||||
}
|
||||
|
||||
return allVersions, nil
|
||||
}
|
||||
|
||||
// parseImage extracts the owner and package name from an image string.
|
||||
// Supported formats:
|
||||
// - "owner/package"
|
||||
// - "registry.example.com/owner/package"
|
||||
//
|
||||
// Returns empty strings if the format is invalid.
|
||||
func parseImage(image string) (owner, pkg string) {
|
||||
parts := strings.Split(image, "/")
|
||||
switch len(parts) {
|
||||
case 2:
|
||||
// owner/package
|
||||
return parts[0], parts[1]
|
||||
case 3:
|
||||
// registry.example.com/owner/package
|
||||
return parts[1], parts[2]
|
||||
default:
|
||||
return "", ""
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,210 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/alexei/docker-watcher/internal/crypto"
|
||||
"github.com/alexei/docker-watcher/internal/store"
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
// Poller periodically checks registries for new image tags and triggers
|
||||
// deployments for stages with auto_deploy enabled.
|
||||
type Poller struct {
|
||||
store *store.Store
|
||||
deployer DeployTriggerer
|
||||
encKey [32]byte
|
||||
cron *cron.Cron
|
||||
mu sync.Mutex
|
||||
entryID cron.EntryID
|
||||
running bool
|
||||
}
|
||||
|
||||
// NewPoller creates a new Poller instance.
|
||||
func NewPoller(st *store.Store, deployer DeployTriggerer, encKey [32]byte) *Poller {
|
||||
return &Poller{
|
||||
store: st,
|
||||
deployer: deployer,
|
||||
encKey: encKey,
|
||||
cron: cron.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins the polling scheduler with the given interval string (e.g., "5m", "1h").
|
||||
// If the poller is already running, it stops and restarts with the new interval.
|
||||
func (p *Poller) Start(interval string) error {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
duration, err := time.ParseDuration(interval)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse polling interval %q: %w", interval, err)
|
||||
}
|
||||
|
||||
// Stop existing schedule if running.
|
||||
if p.running {
|
||||
p.cron.Remove(p.entryID)
|
||||
}
|
||||
|
||||
// Convert duration to a cron schedule: @every <duration>.
|
||||
spec := fmt.Sprintf("@every %s", duration.String())
|
||||
entryID, err := p.cron.AddFunc(spec, func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute)
|
||||
defer cancel()
|
||||
if pollErr := p.poll(ctx); pollErr != nil {
|
||||
log.Printf("[poller] poll error: %v", pollErr)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("schedule poller: %w", err)
|
||||
}
|
||||
|
||||
p.entryID = entryID
|
||||
if !p.running {
|
||||
p.cron.Start()
|
||||
}
|
||||
p.running = true
|
||||
|
||||
log.Printf("[poller] started with interval %s", duration)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop gracefully shuts down the poller.
|
||||
func (p *Poller) Stop() {
|
||||
p.mu.Lock()
|
||||
defer p.mu.Unlock()
|
||||
|
||||
if p.running {
|
||||
ctx := p.cron.Stop()
|
||||
<-ctx.Done()
|
||||
p.running = false
|
||||
log.Println("[poller] stopped")
|
||||
}
|
||||
}
|
||||
|
||||
// poll performs a single polling cycle: iterates over all projects and their
|
||||
// stages, checks for new tags, and triggers deploys where appropriate.
|
||||
func (p *Poller) poll(ctx context.Context) error {
|
||||
projects, err := p.store.GetAllProjects()
|
||||
if err != nil {
|
||||
return fmt.Errorf("get projects: %w", err)
|
||||
}
|
||||
|
||||
for _, project := range projects {
|
||||
if err := p.pollProject(ctx, project); err != nil {
|
||||
log.Printf("[poller] project %s (%s): %v", project.Name, project.ID, err)
|
||||
// Continue polling other projects even if one fails.
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// pollProject checks all stages of a single project for new tags.
|
||||
func (p *Poller) pollProject(ctx context.Context, project store.Project) error {
|
||||
if project.Registry == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Look up the registry configuration by name (projects store registry name, not ID).
|
||||
reg, err := p.store.GetRegistryByName(project.Registry)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get registry %s: %w", project.Registry, err)
|
||||
}
|
||||
|
||||
// Decrypt the registry token.
|
||||
token, err := crypto.Decrypt(p.encKey, reg.Token)
|
||||
if err != nil {
|
||||
// Token might not be encrypted (empty or plaintext).
|
||||
token = reg.Token
|
||||
}
|
||||
|
||||
// Create a registry client for this registry type.
|
||||
client, err := NewClient(reg.Type, reg.URL, token)
|
||||
if err != nil {
|
||||
return fmt.Errorf("create registry client: %w", err)
|
||||
}
|
||||
|
||||
// Fetch all available tags for the project image.
|
||||
tags, err := client.ListTags(ctx, project.Image)
|
||||
if err != nil {
|
||||
return fmt.Errorf("list tags for %s: %w", project.Image, err)
|
||||
}
|
||||
|
||||
// Check each stage of the project.
|
||||
stages, err := p.store.GetStagesByProjectID(project.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get stages for project %s: %w", project.ID, err)
|
||||
}
|
||||
|
||||
for _, stage := range stages {
|
||||
if err := p.pollStage(ctx, project, stage, tags); err != nil {
|
||||
log.Printf("[poller] project %s stage %s: %v", project.Name, stage.Name, err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// pollStage checks a single stage for new tags and triggers deploy if needed.
|
||||
func (p *Poller) pollStage(ctx context.Context, project store.Project, stage store.Stage, allTags []string) error {
|
||||
// Find the latest tag matching the stage's pattern.
|
||||
latest, err := LatestTag(allTags, stage.TagPattern)
|
||||
if err != nil {
|
||||
return fmt.Errorf("match tags for stage %s: %w", stage.Name, err)
|
||||
}
|
||||
if latest == "" {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Get the last polled tag for this stage.
|
||||
state, err := p.store.GetPollState(stage.ID)
|
||||
if err != nil {
|
||||
// No poll state yet — this is the first poll for this stage.
|
||||
// Record the current latest tag without triggering a deploy,
|
||||
// so we don't deploy everything on first startup.
|
||||
return p.store.UpsertPollState(store.PollState{
|
||||
StageID: stage.ID,
|
||||
LastTag: latest,
|
||||
LastPolled: now(),
|
||||
})
|
||||
}
|
||||
|
||||
// Update the poll timestamp regardless.
|
||||
defer func() {
|
||||
if err := p.store.UpsertPollState(store.PollState{
|
||||
StageID: stage.ID,
|
||||
LastTag: latest,
|
||||
LastPolled: now(),
|
||||
}); err != nil {
|
||||
log.Printf("[poller] failed to update poll state for stage %s: %v", stage.ID, err)
|
||||
}
|
||||
}()
|
||||
|
||||
// If the latest tag hasn't changed, nothing to do.
|
||||
if state.LastTag == latest {
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Printf("[poller] new tag %q detected for project %s stage %s (was %q)",
|
||||
latest, project.Name, stage.Name, state.LastTag)
|
||||
|
||||
// Only trigger deploy if auto_deploy is enabled for this stage.
|
||||
if !stage.AutoDeploy {
|
||||
log.Printf("[poller] auto_deploy disabled for stage %s, skipping deploy", stage.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := p.deployer.TriggerDeploy(ctx, project.ID, stage.ID, latest); err != nil {
|
||||
return fmt.Errorf("trigger deploy for tag %s: %w", latest, err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// now returns the current UTC time as a formatted string.
|
||||
func now() string {
|
||||
return time.Now().UTC().Format("2006-01-02 15:04:05")
|
||||
}
|
||||
@@ -0,0 +1,78 @@
|
||||
package registry
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"path"
|
||||
"sort"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Client defines the interface for interacting with a container image registry.
|
||||
type Client interface {
|
||||
// ListTags returns all available tags for the given image.
|
||||
ListTags(ctx context.Context, image string) ([]string, error)
|
||||
|
||||
// GetLatestTag returns the most recently created tag that matches the given
|
||||
// glob pattern. Returns an empty string and no error if no tags match.
|
||||
GetLatestTag(ctx context.Context, image string, pattern string) (string, error)
|
||||
}
|
||||
|
||||
// DeployTriggerer is called by the poller when a new tag is detected for a
|
||||
// stage with auto_deploy enabled. This decouples the registry package from the
|
||||
// deployer implementation.
|
||||
type DeployTriggerer interface {
|
||||
TriggerDeploy(ctx context.Context, projectID, stageID, imageTag string) error
|
||||
}
|
||||
|
||||
// MatchTags filters a list of tags, returning only those that match the given
|
||||
// glob pattern. Pattern matching uses path.Match semantics (*, ?, []).
|
||||
// Returns an error if the pattern is malformed.
|
||||
func MatchTags(tags []string, pattern string) ([]string, error) {
|
||||
if pattern == "" || pattern == "*" {
|
||||
result := make([]string, len(tags))
|
||||
copy(result, tags)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// Validate pattern once before iterating.
|
||||
if _, err := path.Match(pattern, ""); err != nil {
|
||||
return nil, fmt.Errorf("invalid tag pattern %q: %w", pattern, err)
|
||||
}
|
||||
|
||||
var matched []string
|
||||
for _, tag := range tags {
|
||||
ok, _ := path.Match(pattern, tag)
|
||||
if ok {
|
||||
matched = append(matched, tag)
|
||||
}
|
||||
}
|
||||
return matched, nil
|
||||
}
|
||||
|
||||
// LatestTag returns the last element of a sorted tag list that matches the
|
||||
// pattern. Tags are sorted lexicographically; the "latest" is the last in sort
|
||||
// order. Returns empty string if no tags match. Returns an error if the pattern
|
||||
// is malformed.
|
||||
func LatestTag(tags []string, pattern string) (string, error) {
|
||||
matched, err := MatchTags(tags, pattern)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if len(matched) == 0 {
|
||||
return "", nil
|
||||
}
|
||||
sort.Strings(matched)
|
||||
return matched[len(matched)-1], nil
|
||||
}
|
||||
|
||||
// NewClient creates a registry Client based on the registry type string.
|
||||
// Supported types: "gitea". Future: "github", "dockerhub".
|
||||
func NewClient(registryType, baseURL, token string) (Client, error) {
|
||||
switch strings.ToLower(registryType) {
|
||||
case "gitea":
|
||||
return NewGiteaClient(baseURL, token), nil
|
||||
default:
|
||||
return nil, fmt.Errorf("unsupported registry type: %s", registryType)
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user