package main import ( "context" "errors" "fmt" "io/fs" "log/slog" "net/http" "os" "os/signal" "path/filepath" "syscall" "time" "github.com/robfig/cron/v3" tinyforge "github.com/alexei/tinyforge" "github.com/alexei/tinyforge/internal/api" "github.com/alexei/tinyforge/internal/auth" "github.com/alexei/tinyforge/internal/backup" "github.com/alexei/tinyforge/internal/config" "github.com/alexei/tinyforge/internal/crypto" "github.com/alexei/tinyforge/internal/deployer" "github.com/alexei/tinyforge/internal/dns" "github.com/alexei/tinyforge/internal/docker" "github.com/alexei/tinyforge/internal/events" "github.com/alexei/tinyforge/internal/health" "github.com/alexei/tinyforge/internal/logging" "github.com/alexei/tinyforge/internal/logscanner" "github.com/alexei/tinyforge/internal/metricalert" "github.com/alexei/tinyforge/internal/notify" "github.com/alexei/tinyforge/internal/npm" "github.com/alexei/tinyforge/internal/proxy" "github.com/alexei/tinyforge/internal/reconciler" "github.com/alexei/tinyforge/internal/scheduler" "github.com/alexei/tinyforge/internal/stale" "github.com/alexei/tinyforge/internal/stats" "github.com/alexei/tinyforge/internal/store" "github.com/alexei/tinyforge/internal/volsnap" "github.com/alexei/tinyforge/internal/webhook" "github.com/alexei/tinyforge/internal/workload/plugin" // Plugin registrations: each blank-import runs its init() and registers // itself with internal/workload/plugin. Adding a new Source or Trigger // is a matter of dropping a new package and adding it to this list. _ "github.com/alexei/tinyforge/internal/workload/plugin/source/compose" _ "github.com/alexei/tinyforge/internal/workload/plugin/source/dockerfile" _ "github.com/alexei/tinyforge/internal/workload/plugin/source/image" _ "github.com/alexei/tinyforge/internal/workload/plugin/source/static" _ "github.com/alexei/tinyforge/internal/workload/plugin/trigger/git" _ "github.com/alexei/tinyforge/internal/workload/plugin/trigger/manual" _ "github.com/alexei/tinyforge/internal/workload/plugin/trigger/registry" _ "github.com/alexei/tinyforge/internal/workload/plugin/trigger/schedule" ) func main() { // Initialize structured JSON logging. logging.Setup() dataDir := envOrDefault("DATA_DIR", "./data") if err := os.MkdirAll(dataDir, 0o755); err != nil { slog.Error("create data directory", "error", err) os.Exit(1) } // Acquire single-instance lockfile BEFORE opening the DB. SQLite + // SetMaxOpenConns(1) does not protect against two Tinyforge processes // sharing a data directory; without this guard a misconfigured // systemd unit, container restart race, or `tinyforge` shell typo can // silently double-fire schedulers, double-poll registries, and // corrupt `extra_json` RMW. The lockfile is a PID file under // $DATA_DIR/tinyforge.lock — collisions with dead PIDs are reclaimed. releaseLock, err := store.AcquireLockfile(dataDir) if err != nil { slog.Error("could not acquire data-dir lock", "data_dir", dataDir, "error", err) os.Exit(1) } defer releaseLock() // Open database. dbPath := filepath.Join(dataDir, "tinyforge.db") db, err := store.New(dbPath) if err != nil { slog.Error("open store", "error", err) os.Exit(1) } defer db.Close() // Derive encryption key from environment (required). encKey, err := crypto.KeyFromEnv() if err != nil { slog.Error("ENCRYPTION_KEY is required — set it to a random 32+ character string") os.Exit(1) } // One-shot migration: rewrite every legacy unprefixed-hex secret // in the DB into the new tf1: envelope form. Idempotent (gated by // schema_versions version 2). Lets the rest of the codebase treat // envelope-presence as a stable invariant for future key rotations. // Failures here are logged but non-fatal: a partial migration just // means some columns keep working through Decrypt's legacy // fallback until the next manual save re-encrypts them. if err := db.MigrateSecretsToEnvelope(store.EnvelopeMigrator{ HasEnvelope: crypto.HasEnvelope, Decrypt: func(v string) (string, error) { return crypto.Decrypt(encKey, v) }, Encrypt: func(v string) (string, error) { return crypto.Encrypt(encKey, v) }, }); err != nil { slog.Warn("secrets envelope migration", "error", err) } // Import seed config on first launch (idempotent). seedPath := envOrDefault("SEED_FILE", "./tinyforge.yaml") if err := config.ImportSeed(db, seedPath); err != nil { slog.Error("seed import", "error", err) os.Exit(1) } // Ensure default admin user exists on first launch. if err := ensureDefaultAdmin(db); err != nil { slog.Error("ensure default admin", "error", err) os.Exit(1) } // Initialize Docker client. dockerClient, err := docker.New() if err != nil { slog.Error("create docker client", "error", err) os.Exit(1) } defer dockerClient.Close() // Start the container index reconciler. Runs one boot pass and then // ticks every 30s. Boot pass populates the containers table from any // running containers that predate the workload refactor; subsequent // ticks catch state drift the deployer didn't witness. rec := reconciler.New(db, dockerClient, 30*time.Second) rec.Start(context.Background()) defer rec.Stop() // Read settings for NPM URL and polling interval. settings, err := db.GetSettings() if err != nil { slog.Error("get settings", "error", err) os.Exit(1) } // Initialize NPM client (used for NPM-specific endpoints like certificates). npmURL := envOrDefault("NPM_URL", settings.NpmURL) npmClient := npm.New(npmURL) // Build proxy provider based on settings. var proxyProvider proxy.Provider switch settings.ProxyProvider { case "none": proxyProvider = proxy.NewNoneProvider() slog.Info("proxy provider: none") case "traefik": proxyProvider = proxy.NewTraefikProvider( settings.TraefikEntrypoint, settings.TraefikCertResolver, settings.TraefikNetwork, settings.TraefikAPIURL, ) slog.Info("proxy provider: traefik", "entrypoint", settings.TraefikEntrypoint) default: // Default to NPM for backward compatibility (including "npm" and empty string). npmPassword := "" if settings.NpmPassword != "" { decrypted, err := crypto.Decrypt(encKey, settings.NpmPassword) if err != nil { slog.Warn("failed to decrypt NPM password for proxy provider", "error", err) } else { npmPassword = decrypted } } proxyProvider = proxy.NewNpmProvider(npmClient, settings.NpmEmail, npmPassword) slog.Info("proxy provider: npm", "url", npmURL) } // Initialize services. healthChecker := health.New() notifier := notify.New() eventBus := events.New() // Auto-persist warn/error events from the event bus to the database. stopLogger := eventBus.RegisterPersistentLogger(func(source, severity, message, metadata string) (int64, string, error) { evt, err := db.InsertEvent(store.EventLog{ Source: source, Severity: severity, Message: message, Metadata: metadata, }) if err != nil { return 0, "", err } return evt.ID, evt.CreatedAt, nil }) defer stopLogger() // Event-trigger dispatcher: consume EventLog publishes off the bus // and fan out to operator-configured webhook actions. stopTriggerDispatcher := events.RegisterEventTriggerDispatcher(eventBus, db, notifier) defer stopTriggerDispatcher() dep := deployer.New(dockerClient, proxyProvider, db, healthChecker, notifier, eventBus, encKey) rec.SetPluginReconciler(dep) // Initialize webhook handler. The single inbound surface is // /api/webhook/triggers/{secret}; the plugin dispatcher wires the // trigger fan-out to the deployer. webhookHandler := webhook.NewHandler(db) webhookHandler.SetPluginDispatcher(dep) // Scheduler ticks every 30s and dispatches "schedule"-kind triggers // through the same FanOutForTrigger path as the inbound webhook. Boot // runs one sweep immediately so a daily schedule does not idle 24h // after a restart before catching up. sched := scheduler.New(db, func(ctx context.Context, trg store.Trigger, evt plugin.InboundEvent) error { results, err := webhookHandler.FanOutForTrigger(ctx, trg, evt) if err != nil { return err } // Log per-fire summary so a schedule that quietly fails on N // of M bindings is visible without parsing per-binding rows. var deployed, errored int for _, r := range results { switch { case r.Deployed: deployed++ case r.Reason == webhook.ReasonBindingDisabled, r.Reason == webhook.ReasonNoMatch, r.Reason == webhook.ReasonPreviewNoop: // not a failure — silent default: errored++ } } slog.Info("scheduler dispatch summary", "trigger", trg.Name, "bindings", len(results), "deployed", deployed, "errored", errored) return nil }, 30*time.Second) sched.Start(context.Background()) defer sched.Stop() // Initialize stale container scanner. staleScanner := stale.New(db, dockerClient, eventBus) if err := staleScanner.Start("1h"); err != nil { slog.Warn("failed to start stale scanner", "error", err) } // Start daily event log pruning cron job. cronScheduler := cron.New() if _, err := cronScheduler.AddFunc("@daily", func() { pruned, err := db.PruneEvents(30) if err != nil { slog.Error("event log prune failed", "error", err) return } if pruned > 0 { slog.Info("pruned old event log entries", "count", pruned) } }); err != nil { slog.Warn("failed to schedule event prune cron", "error", err) } // Webhook delivery log: keep 14 days of audit trail. if _, err := cronScheduler.AddFunc("@daily", func() { cutoff := time.Now().UTC().AddDate(0, 0, -14).Format("2006-01-02 15:04:05") pruned, err := db.PruneWebhookDeliveriesBefore(cutoff) if err != nil { slog.Error("webhook delivery prune failed", "error", err) return } if pruned > 0 { slog.Info("pruned old webhook deliveries", "count", pruned) } }); err != nil { slog.Warn("failed to schedule webhook delivery prune cron", "error", err) } cronScheduler.Start() // Subscribe to error events and forward notifications. notifySub := eventBus.Subscribe(func(evt events.Event) bool { if evt.Type != events.EventLog { return false } p, ok := evt.Payload.(events.EventLogPayload) if !ok { return false } return p.Severity == "error" }) go func() { for evt := range notifySub { p, ok := evt.Payload.(events.EventLogPayload) if !ok { continue } currentSettings, err := db.GetSettings() if err != nil || currentSettings.NotificationURL == "" { continue } notifier.SendSigned(currentSettings.NotificationURL, currentSettings.NotificationSecret, notify.TierSettings, notify.Event{ Type: p.Source + "_error", Project: p.Source, Error: p.Message, }) } }() // Initialize DNS provider from settings (nil for wildcard mode). dnsProvider := initDNSProvider(settings, encKey) if dnsProvider != nil { dep.SetDNSProvider(dnsProvider) slog.Info("DNS provider initialized", "provider", settings.DNSProvider) } // Initialize backup engine. backupEngine, err := backup.New(db, dbPath, dataDir) if err != nil { slog.Error("create backup engine", "error", err) os.Exit(1) } dep.SetPreDeployBackuper(backupEngine) // Initialize volume-snapshot engine (per-workload data-volume archives). snapshotEngine, err := volsnap.New(db, dataDir) if err != nil { slog.Error("create snapshot engine", "error", err) os.Exit(1) } // Reclaim snapshot files orphaned by workload deletes (rows CASCADE, files don't). if cleaned, err := snapshotEngine.CleanOrphans(); err != nil { slog.Warn("snapshots: clean orphans on startup", "error", err) } else if cleaned > 0 { slog.Info("snapshots: cleaned orphan files on startup", "count", cleaned) } // Clean orphaned backup files and prune on startup. if cleaned, err := backupEngine.CleanOrphans(); err != nil { slog.Warn("backup: clean orphans on startup", "error", err) } else if cleaned > 0 { slog.Info("backup: cleaned orphaned files on startup", "count", cleaned) } if settings.BackupRetentionCount > 0 { if pruned, err := backupEngine.Prune(settings.BackupRetentionCount); err != nil { slog.Warn("backup: prune on startup", "error", err) } else if pruned > 0 { slog.Info("backup: pruned old backups on startup", "count", pruned) } } // Schedule autobackup if enabled. Track entry ID for rescheduling. var backupCronID cron.EntryID scheduleAutobackup := func(enabled bool, intervalHours int) { // Remove existing schedule if any. if backupCronID != 0 { cronScheduler.Remove(backupCronID) backupCronID = 0 slog.Info("autobackup: removed previous schedule") } if !enabled || intervalHours <= 0 { return } interval := fmt.Sprintf("@every %dh", intervalHours) id, err := cronScheduler.AddFunc(interval, func() { b, err := backupEngine.CreateBackup("auto") if err != nil { slog.Error("autobackup failed", "error", err) return } slog.Info("autobackup completed", "id", b.ID, "filename", b.Filename) currentSettings, err := db.GetSettings() if err == nil && currentSettings.BackupRetentionCount > 0 { backupEngine.Prune(currentSettings.BackupRetentionCount) } }) if err != nil { slog.Warn("failed to schedule autobackup", "error", err) } else { backupCronID = id slog.Info("autobackup scheduled", "interval_hours", intervalHours) } } scheduleAutobackup(settings.BackupEnabled, settings.BackupIntervalHours) // Initialize resource stats collector. statsCollector := stats.New(db, dockerClient) statsCollector.Start() // Log-scan manager: tails running containers and emits event_log // entries when log lines match operator-configured regex rules. logScanMgr := logscanner.NewManager(logscanner.Config{ Rules: db, Containers: db, Docker: dockerClient, Events: db, Bus: eventBus, PollInterval: 5 * time.Second, }) if err := logScanMgr.Start(context.Background()); err != nil { slog.Warn("logscanner: initial rule load failed", "error", err) } defer logScanMgr.Stop() // Metric-alert manager: evaluates threshold rules against recent // container stats samples and emits event_log entries on breach. // The store satisfies RuleSource/SampleSource/EventSink; the event // bus is the Publisher. metricAlertMgr := metricalert.New(db, db, db, eventBus) metricAlertMgr.Start() defer metricAlertMgr.Stop() // Build API server. apiServer := api.NewServer(db, dockerClient, npmClient, proxyProvider, dep, notifier, webhookHandler, eventBus, encKey) apiServer.SetStaleScanner(staleScanner) apiServer.SetLogScanReloader(logScanMgr) apiServer.SetBackupEngine(backupEngine) apiServer.SetSnapshotEngine(snapshotEngine) // Wire the restore lifecycle seam and reconcile any restore interrupted by a // crash, BEFORE the HTTP server starts serving — so a half-applied restore is // completed/reverted first and the restore endpoint is never reachable // without its safety net. snapshotEngine.SetLifecycle(&restoreLifecycle{dep: dep, docker: dockerClient, store: db}) if n, err := snapshotEngine.RecoverInterruptedRestores(); err != nil { slog.Warn("snapshots: recover interrupted restores on startup", "error", err) } else if n > 0 { slog.Info("snapshots: recovered interrupted restores on startup", "count", n) } apiServer.SetDBPath(dbPath) apiServer.SetBackupSettingsChangedCallback(scheduleAutobackup) apiServer.SetDNSProvider(dnsProvider) apiServer.SetDNSProviderChangedCallback(func(provider dns.Provider) { dep.SetDNSProvider(provider) }) apiServer.SetProxyProviderChangedCallback(func(provider proxy.Provider) { dep.SetProxyProvider(provider) }) router := apiServer.Router() // Serve embedded static files for the SPA frontend. webBuildFS, err := fs.Sub(tinyforge.WebBuildFS, "web/build") if err != nil { slog.Warn("embedded frontend not available", "error", err) } else { staticHandler := api.StaticHandler(webBuildFS) router.NotFound(staticHandler.ServeHTTP) } // Start HTTP server. addr := envOrDefault("LISTEN_ADDR", ":8080") httpServer := &http.Server{ Addr: addr, Handler: router, ReadTimeout: 30 * time.Second, // WriteTimeout is disabled (0) to support SSE long-lived connections. WriteTimeout: 0, IdleTimeout: 120 * time.Second, } // Graceful shutdown. done := make(chan os.Signal, 1) signal.Notify(done, os.Interrupt, syscall.SIGTERM) // Allow restore to trigger shutdown. apiServer.SetShutdownFunc(func() { done <- syscall.SIGTERM }) go func() { slog.Info("Tinyforge started", "addr", addr) if err := httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { slog.Error("HTTP server error", "error", err) os.Exit(1) } }() <-done slog.Info("shutting down...") // Stop accepting new work. cronScheduler.Stop() eventBus.Unsubscribe(notifySub) staleScanner.Stop() statsCollector.Stop() metricAlertMgr.Stop() // Drain in-progress deploys and notifications. dep.Drain() webhookHandler.Drain() notifier.Drain() // Shut down HTTP server. ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() if err := httpServer.Shutdown(ctx); err != nil { slog.Error("HTTP server shutdown error", "error", err) } // Close database. if err := db.Close(); err != nil { slog.Error("database close error", "error", err) } slog.Info("Tinyforge stopped") } // envOrDefault reads an environment variable or returns the fallback value. func envOrDefault(key, fallback string) string { if v := os.Getenv(key); v != "" { return v } return fallback } // ensureDefaultAdmin creates a default admin user on first launch if no users exist. func ensureDefaultAdmin(db *store.Store) error { count, err := db.UserCount() if err != nil { return err } if count > 0 { return nil // Users already exist, skip. } password := os.Getenv("ADMIN_PASSWORD") if password == "" { slog.Error("ADMIN_PASSWORD is required on first launch — set it to a secure password") os.Exit(1) } hash, err := auth.HashPassword(password) if err != nil { return err } _, err = db.CreateUser(store.User{ Username: "admin", PasswordHash: hash, Email: "", Role: "admin", }) if err != nil { // Ignore duplicate key errors (race condition on concurrent startup). if errors.Is(err, store.ErrNotFound) { return nil } return err } slog.Info("default admin user created", "username", "admin") return nil } // initDNSProvider creates a DNS provider from settings. Returns nil for wildcard mode. func initDNSProvider(settings store.Settings, encKey [32]byte) dns.Provider { if settings.WildcardDNS || settings.DNSProvider == "" { return nil } token := settings.CloudflareAPIToken if token != "" { decrypted, err := crypto.Decrypt(encKey, token) if err != nil { slog.Error("dns: failed to decrypt API token", "error", err) return nil } token = decrypted } provider, err := dns.NewProvider(settings.DNSProvider, dns.Config{ Token: token, ZoneID: settings.CloudflareZoneID, }) if err != nil { slog.Error("dns: failed to create provider", "error", err) return nil } return provider }