Files
tiny-forge/cmd/server/main.go
T
alexei.dolgolyov 7d6719da12 refactor: extract ProxyProvider interface with None and NPM implementations
Replace direct npm.Client usage throughout the codebase with the
proxy.Provider interface, enabling pluggable proxy backends. The
deployer, API layer, and proxy manager now use provider-agnostic
route management (ConfigureRoute/DeleteRoute) instead of NPM-specific
API calls. Adds ProxyRouteID (string) to Instance model and
ProxyProvider setting to Settings, with SQLite migrations for
backward compatibility.
2026-04-04 19:39:08 +03:00

440 lines
12 KiB
Go

package main
import (
"context"
"errors"
"fmt"
"io/fs"
"log/slog"
"net/http"
"os"
"os/signal"
"path/filepath"
"syscall"
"time"
"github.com/robfig/cron/v3"
dockerwatcher "github.com/alexei/docker-watcher"
"github.com/alexei/docker-watcher/internal/api"
"github.com/alexei/docker-watcher/internal/auth"
"github.com/alexei/docker-watcher/internal/config"
"github.com/alexei/docker-watcher/internal/crypto"
"github.com/alexei/docker-watcher/internal/backup"
"github.com/alexei/docker-watcher/internal/deployer"
"github.com/alexei/docker-watcher/internal/dns"
"github.com/alexei/docker-watcher/internal/docker"
"github.com/alexei/docker-watcher/internal/events"
"github.com/alexei/docker-watcher/internal/health"
"github.com/alexei/docker-watcher/internal/logging"
"github.com/alexei/docker-watcher/internal/notify"
"github.com/alexei/docker-watcher/internal/npm"
"github.com/alexei/docker-watcher/internal/proxy"
"github.com/alexei/docker-watcher/internal/registry"
"github.com/alexei/docker-watcher/internal/stale"
"github.com/alexei/docker-watcher/internal/store"
"github.com/alexei/docker-watcher/internal/webhook"
)
func main() {
// Initialize structured JSON logging.
logging.Setup()
dataDir := envOrDefault("DATA_DIR", "./data")
if err := os.MkdirAll(dataDir, 0o755); err != nil {
slog.Error("create data directory", "error", err)
os.Exit(1)
}
// Open database.
dbPath := filepath.Join(dataDir, "docker-watcher.db")
db, err := store.New(dbPath)
if err != nil {
slog.Error("open store", "error", err)
os.Exit(1)
}
defer db.Close()
// Derive encryption key from environment (required).
encKey, err := crypto.KeyFromEnv()
if err != nil {
slog.Error("ENCRYPTION_KEY is required — set it to a random 32+ character string")
os.Exit(1)
}
// Import seed config on first launch (idempotent).
seedPath := envOrDefault("SEED_FILE", "./docker-watcher.yaml")
if err := config.ImportSeed(db, seedPath); err != nil {
slog.Error("seed import", "error", err)
os.Exit(1)
}
// Ensure default admin user exists on first launch.
if err := ensureDefaultAdmin(db); err != nil {
slog.Error("ensure default admin", "error", err)
os.Exit(1)
}
// Initialize Docker client.
dockerClient, err := docker.New()
if err != nil {
slog.Error("create docker client", "error", err)
os.Exit(1)
}
defer dockerClient.Close()
// Read settings for NPM URL and polling interval.
settings, err := db.GetSettings()
if err != nil {
slog.Error("get settings", "error", err)
os.Exit(1)
}
// Initialize NPM client (used for NPM-specific endpoints like certificates).
npmURL := envOrDefault("NPM_URL", settings.NpmURL)
npmClient := npm.New(npmURL)
// Build proxy provider based on settings.
var proxyProvider proxy.Provider
switch settings.ProxyProvider {
case "none":
proxyProvider = proxy.NewNoneProvider()
slog.Info("proxy provider: none")
default:
// Default to NPM for backward compatibility (including "npm" and empty string).
npmPassword := ""
if settings.NpmPassword != "" {
decrypted, err := crypto.Decrypt(encKey, settings.NpmPassword)
if err != nil {
slog.Warn("failed to decrypt NPM password for proxy provider", "error", err)
} else {
npmPassword = decrypted
}
}
proxyProvider = proxy.NewNpmProvider(npmClient, settings.NpmEmail, npmPassword)
slog.Info("proxy provider: npm", "url", npmURL)
}
// Initialize services.
healthChecker := health.New()
notifier := notify.New()
eventBus := events.New()
// Auto-persist warn/error events from the event bus to the database.
stopLogger := eventBus.RegisterPersistentLogger(func(source, severity, message, metadata string) (int64, string, error) {
evt, err := db.InsertEvent(store.EventLog{
Source: source,
Severity: severity,
Message: message,
Metadata: metadata,
})
if err != nil {
return 0, "", err
}
return evt.ID, evt.CreatedAt, nil
})
defer stopLogger()
dep := deployer.New(dockerClient, proxyProvider, db, healthChecker, notifier, eventBus, encKey)
// Initialize webhook handler.
webhookHandler := webhook.NewHandler(db, dep, dockerClient)
// Ensure webhook secret exists.
_, err = webhook.EnsureWebhookSecret(db)
if err != nil {
slog.Error("ensure webhook secret", "error", err)
os.Exit(1)
}
slog.Info("webhook secret configured (use /api/settings/webhook-url to retrieve)")
// Initialize registry poller.
poller := registry.NewPoller(db, dep, encKey)
pollingInterval := envOrDefault("POLLING_INTERVAL", settings.PollingInterval)
if pollingInterval != "" {
if err := poller.Start(pollingInterval); err != nil {
slog.Warn("failed to start poller", "error", err)
}
}
// Initialize stale container scanner.
staleScanner := stale.New(db, dockerClient, eventBus)
if err := staleScanner.Start("1h"); err != nil {
slog.Warn("failed to start stale scanner", "error", err)
}
// Initialize proxy manager and health monitor.
proxyManager := proxy.NewManager(db, proxyProvider)
proxyHealth := proxy.NewHealthMonitor(db, eventBus)
if err := proxyHealth.Start("5m"); err != nil {
slog.Warn("failed to start proxy health monitor", "error", err)
}
// Start daily event log pruning cron job.
cronScheduler := cron.New()
if _, err := cronScheduler.AddFunc("@daily", func() {
pruned, err := db.PruneEvents(30)
if err != nil {
slog.Error("event log prune failed", "error", err)
return
}
if pruned > 0 {
slog.Info("pruned old event log entries", "count", pruned)
}
}); err != nil {
slog.Warn("failed to schedule event prune cron", "error", err)
}
cronScheduler.Start()
// Subscribe to error events and forward notifications.
notifySub := eventBus.Subscribe(func(evt events.Event) bool {
if evt.Type != events.EventLog {
return false
}
p, ok := evt.Payload.(events.EventLogPayload)
if !ok {
return false
}
return p.Severity == "error"
})
go func() {
for evt := range notifySub {
p, ok := evt.Payload.(events.EventLogPayload)
if !ok {
continue
}
currentSettings, err := db.GetSettings()
if err != nil || currentSettings.NotificationURL == "" {
continue
}
notifier.Send(currentSettings.NotificationURL, notify.Event{
Type: p.Source + "_error",
Project: p.Source,
Error: p.Message,
})
}
}()
// Initialize DNS provider from settings (nil for wildcard mode).
dnsProvider := initDNSProvider(settings, encKey)
if dnsProvider != nil {
dep.SetDNSProvider(dnsProvider)
proxyManager.SetDNSProvider(dnsProvider)
slog.Info("DNS provider initialized", "provider", settings.DNSProvider)
}
// Initialize backup engine.
backupEngine, err := backup.New(db, dbPath, dataDir)
if err != nil {
slog.Error("create backup engine", "error", err)
os.Exit(1)
}
// Clean orphaned backup files and prune on startup.
if cleaned, err := backupEngine.CleanOrphans(); err != nil {
slog.Warn("backup: clean orphans on startup", "error", err)
} else if cleaned > 0 {
slog.Info("backup: cleaned orphaned files on startup", "count", cleaned)
}
if settings.BackupRetentionCount > 0 {
if pruned, err := backupEngine.Prune(settings.BackupRetentionCount); err != nil {
slog.Warn("backup: prune on startup", "error", err)
} else if pruned > 0 {
slog.Info("backup: pruned old backups on startup", "count", pruned)
}
}
// Schedule autobackup if enabled. Track entry ID for rescheduling.
var backupCronID cron.EntryID
scheduleAutobackup := func(enabled bool, intervalHours int) {
// Remove existing schedule if any.
if backupCronID != 0 {
cronScheduler.Remove(backupCronID)
backupCronID = 0
slog.Info("autobackup: removed previous schedule")
}
if !enabled || intervalHours <= 0 {
return
}
interval := fmt.Sprintf("@every %dh", intervalHours)
id, err := cronScheduler.AddFunc(interval, func() {
b, err := backupEngine.CreateBackup("auto")
if err != nil {
slog.Error("autobackup failed", "error", err)
return
}
slog.Info("autobackup completed", "id", b.ID, "filename", b.Filename)
currentSettings, err := db.GetSettings()
if err == nil && currentSettings.BackupRetentionCount > 0 {
backupEngine.Prune(currentSettings.BackupRetentionCount)
}
})
if err != nil {
slog.Warn("failed to schedule autobackup", "error", err)
} else {
backupCronID = id
slog.Info("autobackup scheduled", "interval_hours", intervalHours)
}
}
scheduleAutobackup(settings.BackupEnabled, settings.BackupIntervalHours)
// Build API server.
apiServer := api.NewServer(db, dockerClient, npmClient, proxyProvider, dep, webhookHandler, eventBus, encKey)
apiServer.SetStaleScanner(staleScanner)
apiServer.SetProxyManager(proxyManager)
apiServer.SetBackupEngine(backupEngine)
apiServer.SetDBPath(dbPath)
apiServer.SetBackupSettingsChangedCallback(scheduleAutobackup)
apiServer.SetDNSProvider(dnsProvider)
apiServer.SetDNSProviderChangedCallback(func(provider dns.Provider) {
dep.SetDNSProvider(provider)
proxyManager.SetDNSProvider(provider)
})
router := apiServer.Router()
// Serve embedded static files for the SPA frontend.
// The embed.FS has "web/build" as a prefix, so we sub it to get the root.
webBuildFS, err := fs.Sub(dockerwatcher.WebBuildFS, "web/build")
if err != nil {
slog.Warn("embedded frontend not available", "error", err)
} else {
staticHandler := api.StaticHandler(webBuildFS)
// Handle all non-API routes with the static file server.
router.NotFound(staticHandler.ServeHTTP)
}
// Start HTTP server.
addr := envOrDefault("LISTEN_ADDR", ":8080")
httpServer := &http.Server{
Addr: addr,
Handler: router,
ReadTimeout: 30 * time.Second,
// WriteTimeout is disabled (0) to support SSE long-lived connections.
// Individual non-SSE handlers should use context timeouts as needed.
WriteTimeout: 0,
IdleTimeout: 120 * time.Second,
}
// Graceful shutdown.
done := make(chan os.Signal, 1)
signal.Notify(done, os.Interrupt, syscall.SIGTERM)
// Allow restore to trigger shutdown.
apiServer.SetShutdownFunc(func() {
done <- syscall.SIGTERM
})
go func() {
slog.Info("Docker Watcher started", "addr", addr)
if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
slog.Error("HTTP server error", "error", err)
os.Exit(1)
}
}()
<-done
slog.Info("shutting down...")
// Stop accepting new work.
cronScheduler.Stop()
eventBus.Unsubscribe(notifySub)
proxyHealth.Stop()
staleScanner.Stop()
poller.Stop()
// Drain in-progress deploys and notifications.
dep.Drain()
notifier.Drain()
// Shut down HTTP server.
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
if err := httpServer.Shutdown(ctx); err != nil {
slog.Error("HTTP server shutdown error", "error", err)
}
// Close database.
if err := db.Close(); err != nil {
slog.Error("database close error", "error", err)
}
slog.Info("Docker Watcher stopped")
}
// envOrDefault reads an environment variable or returns the fallback value.
func envOrDefault(key, fallback string) string {
if v := os.Getenv(key); v != "" {
return v
}
return fallback
}
// ensureDefaultAdmin creates a default admin user on first launch if no users exist.
// The password comes from ADMIN_PASSWORD env var, defaulting to "admin".
func ensureDefaultAdmin(db *store.Store) error {
count, err := db.UserCount()
if err != nil {
return err
}
if count > 0 {
return nil // Users already exist, skip.
}
password := os.Getenv("ADMIN_PASSWORD")
if password == "" {
slog.Error("ADMIN_PASSWORD is required on first launch — set it to a secure password")
os.Exit(1)
}
hash, err := auth.HashPassword(password)
if err != nil {
return err
}
_, err = db.CreateUser(store.User{
Username: "admin",
PasswordHash: hash,
Email: "",
Role: "admin",
})
if err != nil {
// Ignore duplicate key errors (race condition on concurrent startup).
if errors.Is(err, store.ErrNotFound) {
return nil
}
return err
}
slog.Info("default admin user created", "username", "admin")
return nil
}
// initDNSProvider creates a DNS provider from settings. Returns nil for wildcard mode.
func initDNSProvider(settings store.Settings, encKey [32]byte) dns.Provider {
if settings.WildcardDNS || settings.DNSProvider == "" {
return nil
}
token := settings.CloudflareAPIToken
if token != "" {
decrypted, err := crypto.Decrypt(encKey, token)
if err != nil {
slog.Error("dns: failed to decrypt API token", "error", err)
return nil
}
token = decrypted
}
provider, err := dns.NewProvider(settings.DNSProvider, dns.Config{
Token: token,
ZoneID: settings.CloudflareZoneID,
})
if err != nil {
slog.Error("dns: failed to create provider", "error", err)
return nil
}
return provider
}