diff --git a/cmd/server/main.go b/cmd/server/main.go index 579bc9f..db1d35b 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -32,10 +32,12 @@ import ( "github.com/alexei/tinyforge/internal/npm" "github.com/alexei/tinyforge/internal/proxy" "github.com/alexei/tinyforge/internal/reconciler" + "github.com/alexei/tinyforge/internal/scheduler" "github.com/alexei/tinyforge/internal/stale" "github.com/alexei/tinyforge/internal/stats" "github.com/alexei/tinyforge/internal/store" "github.com/alexei/tinyforge/internal/webhook" + "github.com/alexei/tinyforge/internal/workload/plugin" // Plugin registrations: each blank-import runs its init() and registers // itself with internal/workload/plugin. Adding a new Source or Trigger @@ -46,6 +48,7 @@ import ( _ "github.com/alexei/tinyforge/internal/workload/plugin/trigger/git" _ "github.com/alexei/tinyforge/internal/workload/plugin/trigger/manual" _ "github.com/alexei/tinyforge/internal/workload/plugin/trigger/registry" + _ "github.com/alexei/tinyforge/internal/workload/plugin/trigger/schedule" ) func main() { @@ -178,6 +181,36 @@ func main() { webhookHandler := webhook.NewHandler(db) webhookHandler.SetPluginDispatcher(dep) + // Scheduler ticks every 30s and dispatches "schedule"-kind triggers + // through the same FanOutForTrigger path as the inbound webhook. Boot + // runs one sweep immediately so a daily schedule does not idle 24h + // after a restart before catching up. + sched := scheduler.New(db, func(ctx context.Context, trg store.Trigger, evt plugin.InboundEvent) error { + results, err := webhookHandler.FanOutForTrigger(ctx, trg, evt) + if err != nil { + return err + } + // Log per-fire summary so a schedule that quietly fails on N + // of M bindings is visible without parsing per-binding rows. + var deployed, errored int + for _, r := range results { + switch { + case r.Deployed: + deployed++ + case r.Reason == "binding disabled", r.Reason == "no match": + // not a failure — silent + default: + errored++ + } + } + slog.Info("scheduler dispatch summary", + "trigger", trg.Name, "bindings", len(results), + "deployed", deployed, "errored", errored) + return nil + }, 30*time.Second) + sched.Start(context.Background()) + defer sched.Stop() + // Initialize stale container scanner. staleScanner := stale.New(db, dockerClient, eventBus) if err := staleScanner.Start("1h"); err != nil { diff --git a/docs/WORKLOAD_REFACTOR_TODO.md b/docs/WORKLOAD_REFACTOR_TODO.md index e9749b3..d20754c 100644 --- a/docs/WORKLOAD_REFACTOR_TODO.md +++ b/docs/WORKLOAD_REFACTOR_TODO.md @@ -25,10 +25,21 @@ order. > already had ≥87% coverage from the trigger-split work. > > **What's next** is open — the remaining items in the doc are nice-to- -> haves (richer kind-aware UI forms for new trigger kinds; a /triggers -> deep-link from the proxies page; more compose-source coverage that -> needs a `compose` exec seam). Pick from the task list or close the -> arc. +> haves (a /triggers deep-link from the proxies page; more compose-source +> coverage that needs a `compose` exec seam). Pick from the task list or +> close the arc. +> +> **Trigger kind expansion (2026-05-16):** added the fourth trigger +> kind, **schedule** — interval-based recurring trigger driven by the +> new `internal/scheduler` tick loop (default 30s, ≤5m). v1 takes a +> Go-duration interval ("24h", "1h", "168h") with a 1-minute floor; +> dispatches through the same `webhook.Handler.FanOutForTrigger` seam +> the inbound HTTP webhook uses, so per-binding concurrency / outcome +> accounting / config-merge semantics are identical. `triggers` gained +> a `last_fired_at` column; the scheduler persists it BEFORE dispatch +> so a panicking Match cannot wedge a tight loop. The frontend +> picker grid grew to four columns and `/triggers/[id]` surfaces +> "last fired" on schedule rows. ## Status at a glance diff --git a/internal/api/triggers.go b/internal/api/triggers.go index a66ad38..1e2dc06 100644 --- a/internal/api/triggers.go +++ b/internal/api/triggers.go @@ -25,8 +25,14 @@ type triggerView struct { WebhookEnabled bool `json:"webhook_enabled"` WebhookRequireSignature bool `json:"webhook_require_signature"` BindingCount int `json:"binding_count"` - CreatedAt string `json:"created_at"` - UpdatedAt string `json:"updated_at"` + // LastFiredAt is the RFC3339 wall-clock the scheduler last + // dispatched this trigger. Always present in the response shape; + // empty for triggers that have never fired or are not scheduler- + // driven. The detail page renders it as "last fired" on schedule + // triggers; other kinds ignore it. + LastFiredAt string `json:"last_fired_at"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` } func (s *Server) toTriggerView(t store.Trigger) triggerView { @@ -42,6 +48,7 @@ func (s *Server) toTriggerView(t store.Trigger) triggerView { WebhookEnabled: t.WebhookSecret != "", WebhookRequireSignature: t.WebhookRequireSignature, BindingCount: count, + LastFiredAt: t.LastFiredAt, CreatedAt: t.CreatedAt, UpdatedAt: t.UpdatedAt, } @@ -59,6 +66,7 @@ func toTriggerViewWithCount(row store.TriggerWithBindingCount) triggerView { WebhookEnabled: row.WebhookSecret != "", WebhookRequireSignature: row.WebhookRequireSignature, BindingCount: row.BindingCount, + LastFiredAt: row.LastFiredAt, CreatedAt: row.CreatedAt, UpdatedAt: row.UpdatedAt, } diff --git a/internal/scheduler/scheduler.go b/internal/scheduler/scheduler.go new file mode 100644 index 0000000..3ff4c1f --- /dev/null +++ b/internal/scheduler/scheduler.go @@ -0,0 +1,208 @@ +// Package scheduler drives the "schedule" trigger kind. It ticks on a +// fixed interval, scans every enabled schedule trigger, and dispatches +// the ones whose next-fire window has elapsed through the same +// FanOutForTrigger path the inbound HTTP webhook uses. +// +// The scheduler is intentionally simple: +// +// - Tick on `tickInterval` (default 30s). +// - For every trigger with Kind=="schedule", parse its config to get +// the interval, compute (LastFiredAt + interval), and if now >= +// that target, fire. +// - On fire: build a plugin.InboundEvent{Kind: "schedule"} and call +// handler.FanOutForTrigger. last_fired_at is persisted BEFORE the +// dispatch runs so a panicking Match cannot wedge the row into a +// tight retry loop — a failed deploy waits one full interval +// before retry, which is the correct trade-off for a periodic +// refresh trigger. +// - A never-fired trigger (LastFiredAt == "") fires on the next +// tick — operator-friendly for testing "did I configure it right?". +// +// Per-trigger errors are logged but do not abort the tick. +package scheduler + +import ( + "context" + "log/slog" + "sync" + "time" + + "github.com/alexei/tinyforge/internal/store" + "github.com/alexei/tinyforge/internal/workload/plugin" + "github.com/alexei/tinyforge/internal/workload/plugin/trigger/schedule" +) + +// Scheduler owns the background tick loop. +type Scheduler struct { + store *store.Store + dispatcher fanOutFn + tickInterval time.Duration + clock func() time.Time // overridable for tests + + startOnce sync.Once + stopOnce sync.Once + cancel context.CancelFunc + wg sync.WaitGroup +} + +// fanOutFn is the internal callback shape — narrower than the public +// FanOutTrigger interface so the wiring in cmd/server/main.go can pass +// a closure directly without standing up a wrapper type. +type fanOutFn func(ctx context.Context, trg store.Trigger, evt plugin.InboundEvent) error + +// New constructs a Scheduler bound to `st` that dispatches via `fanOut`. +// `tickInterval` controls how often the loop wakes up to check +// schedules; values <=0 fall back to 30s. Tick intervals longer than 5 +// minutes are clamped so a misconfigured value can't silently disable +// schedules. +// +// `fanOut` should call webhook.Handler.FanOutForTrigger and return its +// error (or nil); the per-binding result slice is discarded — the +// scheduler does not need to know per-binding outcomes, only whether +// the dispatch itself failed. +func New(st *store.Store, fanOut fanOutFn, tickInterval time.Duration) *Scheduler { + clamped := tickInterval + if clamped <= 0 { + clamped = 30 * time.Second + } + if clamped > 5*time.Minute { + clamped = 5 * time.Minute + } + if clamped != tickInterval && tickInterval != 0 { + slog.Warn("scheduler: tick interval clamped", + "requested", tickInterval, "applied", clamped) + } + return &Scheduler{ + store: st, + dispatcher: fanOut, + tickInterval: clamped, + clock: func() time.Time { return time.Now().UTC() }, + } +} + +// Start launches the loop. Idempotent — repeat calls are no-ops, not +// goroutine leaks. Mirrors the reconciler's lifecycle. +func (s *Scheduler) Start(ctx context.Context) { + s.startOnce.Do(func() { + ctx, cancel := context.WithCancel(ctx) + s.cancel = cancel + s.wg.Add(1) + go s.loop(ctx) + }) +} + +// Stop cancels the context and waits for the in-flight tick. Idempotent +// via sync.Once — second call returns immediately without panicking on +// a double cancel. +func (s *Scheduler) Stop() { + s.stopOnce.Do(func() { + if s.cancel != nil { + s.cancel() + } + }) + s.wg.Wait() +} + +func (s *Scheduler) loop(ctx context.Context) { + defer s.wg.Done() + // First sweep at boot so a daily schedule does not idle 24h after a + // restart before it picks up rows whose window already elapsed. + s.TickOnce(ctx) + + ticker := time.NewTicker(s.tickInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + s.TickOnce(ctx) + } + } +} + +// TickOnce runs a single sweep. Exposed for tests and for the boot +// kick. On error per-trigger the loop continues with the next row. +func (s *Scheduler) TickOnce(ctx context.Context) { + rows, err := s.store.ListTriggers("schedule") + if err != nil { + slog.Warn("scheduler: list triggers", "error", err) + return + } + now := s.clock() + for _, t := range rows { + if !s.shouldFire(t, now) { + continue + } + s.fire(ctx, t, now) + } +} + +// shouldFire decides whether to dispatch trg at `now`. Returns true if: +// - the trigger's interval is parseable, AND +// - last_fired_at is empty (never fired) OR now >= lastFired + interval. +// +// Unparseable last_fired_at or interval are logged once and treated as +// "do not fire" — the operator needs to fix the config; the scheduler +// must not loop on a broken row. +func (s *Scheduler) shouldFire(t store.Trigger, now time.Time) bool { + interval, err := schedule.IntervalOfRaw(t.Config) + if err != nil { + slog.Warn("scheduler: bad interval", "trigger", t.Name, "error", err) + return false + } + // Defense-in-depth against a hand-inserted row that bypassed + // Validate (manual SQL, restore, ad-hoc migration). Validate + // already enforces the floor on the create path; this re-check + // keeps the loop honest if anything sneaks past it. + if interval < schedule.MinInterval { + slog.Warn("scheduler: interval below minimum, ignoring", + "trigger", t.Name, "interval", interval, "minimum", schedule.MinInterval) + return false + } + if t.LastFiredAt == "" { + return true + } + last, err := time.Parse(time.RFC3339, t.LastFiredAt) + if err != nil { + slog.Warn("scheduler: bad last_fired_at", "trigger", t.Name, + "value", t.LastFiredAt, "error", err) + // Treat as never-fired so the operator's fix-by-redeploy doesn't + // require a manual DB poke. + return true + } + return !now.Before(last.Add(interval)) +} + +// fire dispatches one trigger and records the new last_fired_at. +// +// We persist last_fired_at BEFORE calling the dispatcher so a panic +// inside Match cannot wedge the row into a tight loop. Down-side: a +// deploy that fails leaves the scheduler waiting one full interval +// before retry — acceptable because the trigger is a periodic refresh, +// not a critical-path retry mechanism. +func (s *Scheduler) fire(ctx context.Context, t store.Trigger, now time.Time) { + // Belt-and-suspenders: ListTriggersByKind only returns "schedule" + // rows, but if a future caller wires fire() differently this guard + // keeps the scheduler from blindly dispatching a kind it isn't + // designed for. + if t.Kind != "schedule" { + slog.Warn("scheduler: refusing to fire non-schedule kind", + "trigger", t.Name, "kind", t.Kind) + return + } + ts := now.Format(time.RFC3339) + if err := s.store.SetTriggerLastFired(t.ID, ts); err != nil { + slog.Warn("scheduler: persist last_fired_at", "trigger", t.Name, "error", err) + return + } + evt := plugin.InboundEvent{ + Kind: "schedule", + Schedule: &plugin.ScheduleEvent{FiredAt: now}, + } + if err := s.dispatcher(ctx, t, evt); err != nil { + slog.Warn("scheduler: dispatch", "trigger", t.Name, "error", err) + return + } + slog.Info("scheduler: fired", "trigger", t.Name, "kind", t.Kind, "at", ts) +} diff --git a/internal/scheduler/scheduler_test.go b/internal/scheduler/scheduler_test.go new file mode 100644 index 0000000..efd1853 --- /dev/null +++ b/internal/scheduler/scheduler_test.go @@ -0,0 +1,223 @@ +package scheduler + +import ( + "context" + "testing" + "time" + + "github.com/alexei/tinyforge/internal/store" + "github.com/alexei/tinyforge/internal/workload/plugin" +) + +// newTestStore opens an in-memory SQLite store. Each test gets its own +// DSN so parallel runs do not collide on shared cache databases. +func newTestStore(t *testing.T) *store.Store { + t.Helper() + st, err := store.New(":memory:") + if err != nil { + t.Fatalf("open store: %v", err) + } + t.Cleanup(func() { _ = st.Close() }) + return st +} + +func seedScheduleTrigger(t *testing.T, st *store.Store, name, interval, lastFired string) store.Trigger { + t.Helper() + trg, err := st.CreateTrigger(store.Trigger{ + Kind: "schedule", + Name: name, + Config: `{"interval":"` + interval + `"}`, + LastFiredAt: lastFired, + }) + if err != nil { + t.Fatalf("CreateTrigger: %v", err) + } + return trg +} + +func TestShouldFire(t *testing.T) { + st := newTestStore(t) + now := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC) + s := New(st, func(context.Context, store.Trigger, plugin.InboundEvent) error { return nil }, 0) + + cases := []struct { + name string + interval string + lastFired string + want bool + }{ + {"never fired fires", "1h", "", true}, + {"window not yet elapsed", "1h", now.Add(-30 * time.Minute).Format(time.RFC3339), false}, + {"window exactly elapsed fires", "1h", now.Add(-1 * time.Hour).Format(time.RFC3339), true}, + {"window long elapsed fires", "24h", now.Add(-48 * time.Hour).Format(time.RFC3339), true}, + {"bad interval suppressed", "banana", "", false}, + {"bad last_fired_at treated as never", "1h", "not-a-timestamp", true}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + trg := store.Trigger{ + Config: `{"interval":"` + tc.interval + `"}`, + LastFiredAt: tc.lastFired, + } + got := s.shouldFire(trg, now) + if got != tc.want { + t.Fatalf("shouldFire = %v, want %v", got, tc.want) + } + }) + } +} + +func TestTickOnce_FiresOverdueTriggers(t *testing.T) { + st := newTestStore(t) + now := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC) + + // Three triggers: one overdue, one not yet due, one never-fired. + overdue := seedScheduleTrigger(t, st, "overdue", "1h", now.Add(-2*time.Hour).Format(time.RFC3339)) + notDue := seedScheduleTrigger(t, st, "notdue", "1h", now.Add(-30*time.Minute).Format(time.RFC3339)) + never := seedScheduleTrigger(t, st, "never", "1h", "") + + fired := make(map[string]int) + s := New(st, func(_ context.Context, trg store.Trigger, _ plugin.InboundEvent) error { + fired[trg.Name]++ + return nil + }, 0) + s.clock = func() time.Time { return now } + + s.TickOnce(context.Background()) + + if fired["overdue"] != 1 { + t.Errorf("overdue should fire once, got %d", fired["overdue"]) + } + if fired["notdue"] != 0 { + t.Errorf("notdue should not fire, got %d", fired["notdue"]) + } + if fired["never"] != 1 { + t.Errorf("never should fire once on first tick, got %d", fired["never"]) + } + + // last_fired_at must advance for everyone we dispatched. + for _, id := range []string{overdue.ID, never.ID} { + row, err := st.GetTriggerByID(id) + if err != nil { + t.Fatalf("GetTriggerByID(%s): %v", id, err) + } + if row.LastFiredAt == "" { + t.Errorf("last_fired_at not persisted for %s", row.Name) + } + } + // not-due trigger's last_fired_at must NOT have changed. + row, err := st.GetTriggerByID(notDue.ID) + if err != nil { + t.Fatalf("GetTriggerByID(notdue): %v", err) + } + if row.LastFiredAt != notDue.LastFiredAt { + t.Errorf("notdue last_fired_at changed: was %q now %q", notDue.LastFiredAt, row.LastFiredAt) + } +} + +func TestTickOnce_DispatchErrorDoesNotWedgeOthers(t *testing.T) { + st := newTestStore(t) + now := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC) + + broken := seedScheduleTrigger(t, st, "broken", "1h", "") + seedScheduleTrigger(t, st, "healthy", "1h", "") + + fired := map[string]int{} + s := New(st, func(_ context.Context, trg store.Trigger, _ plugin.InboundEvent) error { + fired[trg.Name]++ + if trg.Name == "broken" { + return context.Canceled + } + return nil + }, 0) + s.clock = func() time.Time { return now } + + s.TickOnce(context.Background()) + + if fired["broken"] != 1 { + t.Errorf("broken should be attempted once, got %d", fired["broken"]) + } + if fired["healthy"] != 1 { + t.Errorf("healthy should fire once, got %d", fired["healthy"]) + } + + // Core persist-before-dispatch invariant: even though the broken + // trigger's dispatcher returned an error, last_fired_at must have + // advanced. Otherwise the scheduler would re-fire it on every tick. + row, err := st.GetTriggerByID(broken.ID) + if err != nil { + t.Fatalf("GetTriggerByID(broken): %v", err) + } + if row.LastFiredAt == "" { + t.Fatalf("broken trigger last_fired_at must advance even on dispatch error") + } + + // And: a second TickOnce at the same `now` must not re-fire broken. + s.TickOnce(context.Background()) + if fired["broken"] != 1 { + t.Errorf("broken refired after persist; got %d (want 1)", fired["broken"]) + } +} + +func TestTickOnce_PersistsLastFiredBeforeDispatch(t *testing.T) { + // Documented behavior: last_fired_at is persisted before the + // dispatcher runs so a panicking match cannot wedge a tight loop. + st := newTestStore(t) + now := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC) + trg := seedScheduleTrigger(t, st, "tick", "1h", "") + + dispatched := false + s := New(st, func(_ context.Context, t store.Trigger, _ plugin.InboundEvent) error { + // At dispatch time the column must already be set. + row, err := st.GetTriggerByID(t.ID) + if err != nil { + return err + } + dispatched = row.LastFiredAt != "" + return nil + }, 0) + s.clock = func() time.Time { return now } + + s.TickOnce(context.Background()) + + if !dispatched { + t.Fatalf("last_fired_at must be persisted before dispatcher runs") + } + row, err := st.GetTriggerByID(trg.ID) + if err != nil { + t.Fatalf("get: %v", err) + } + if row.LastFiredAt != now.Format(time.RFC3339) { + t.Errorf("last_fired_at = %q, want %q", row.LastFiredAt, now.Format(time.RFC3339)) + } +} + +func TestLifecycle_StartStopIdempotent(t *testing.T) { + // Start + Stop are wrapped in sync.Once. A second call must be a + // no-op (no panic on double-cancel, no goroutine leak from double- + // Start). This guards the shutdown path that runs Stop from both + // defer and the signal-handler block in cmd/server/main.go. + st := newTestStore(t) + noop := func(context.Context, store.Trigger, plugin.InboundEvent) error { return nil } + s := New(st, noop, 100*time.Millisecond) + + s.Start(context.Background()) + s.Start(context.Background()) // second call: no goroutine spawned + + s.Stop() + s.Stop() // second call: no panic on closing already-cancelled context +} + +func TestNew_ClampsInterval(t *testing.T) { + st := newTestStore(t) + noop := func(context.Context, store.Trigger, plugin.InboundEvent) error { return nil } + if got := New(st, noop, 0).tickInterval; got != 30*time.Second { + t.Errorf("default = %s, want 30s", got) + } + if got := New(st, noop, 1*time.Hour).tickInterval; got != 5*time.Minute { + t.Errorf("clamped = %s, want 5m", got) + } + if got := New(st, noop, 2*time.Minute).tickInterval; got != 2*time.Minute { + t.Errorf("passthrough = %s, want 2m", got) + } +} diff --git a/internal/store/models.go b/internal/store/models.go index 250adba..c6a067a 100644 --- a/internal/store/models.go +++ b/internal/store/models.go @@ -366,8 +366,13 @@ type Trigger struct { WebhookSecret string `json:"-"` // URL-identifier secret; never serialized WebhookSigningSecret string `json:"-"` // HMAC key; never serialized WebhookRequireSignature bool `json:"webhook_require_signature"` - CreatedAt string `json:"created_at"` - UpdatedAt string `json:"updated_at"` + // LastFiredAt is the RFC3339 wall-clock the scheduler last dispatched + // this trigger. Empty for never-fired or non-schedule triggers. The + // scheduler reads + writes this column to decide next-fire windows + // and to surface "last fired" on the trigger detail page. + LastFiredAt string `json:"last_fired_at,omitempty"` + CreatedAt string `json:"created_at"` + UpdatedAt string `json:"updated_at"` } // WorkloadTriggerBinding joins a Workload to a Trigger. BindingConfig is diff --git a/internal/store/store.go b/internal/store/store.go index 722bae2..11b6baa 100644 --- a/internal/store/store.go +++ b/internal/store/store.go @@ -164,6 +164,11 @@ func (s *Store) runMigrations() error { `ALTER TABLE workloads ADD COLUMN trigger_config TEXT NOT NULL DEFAULT '{}'`, `ALTER TABLE workloads ADD COLUMN public_faces TEXT NOT NULL DEFAULT '[]'`, `ALTER TABLE workloads ADD COLUMN parent_workload_id TEXT NOT NULL DEFAULT ''`, + // Schedule trigger needs a column to remember when it last fired so + // the scheduler can compute next-fire windows across restarts. + // Empty string = never fired. Pre-trigger-split DBs land the column + // here so the scheduler can read/write it on first boot. + `ALTER TABLE triggers ADD COLUMN last_fired_at TEXT NOT NULL DEFAULT ''`, // Hard cutover: drop every legacy table. Idempotent — DROP TABLE // IF EXISTS is a no-op once the table is gone. Operators upgrading // from a pre-cutover build will lose any project / stack / static @@ -275,6 +280,7 @@ func (s *Store) runMigrations() error { webhook_secret TEXT NOT NULL DEFAULT '', webhook_signing_secret TEXT NOT NULL DEFAULT '', webhook_require_signature INTEGER NOT NULL DEFAULT 0, + last_fired_at TEXT NOT NULL DEFAULT '', created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')) )`, diff --git a/internal/store/triggers.go b/internal/store/triggers.go index 443821f..1518392 100644 --- a/internal/store/triggers.go +++ b/internal/store/triggers.go @@ -10,14 +10,14 @@ import ( const triggerColumns = `id, kind, name, config, webhook_secret, webhook_signing_secret, webhook_require_signature, - created_at, updated_at` + last_fired_at, created_at, updated_at` func scanTrigger(s rowScanner) (Trigger, error) { var t Trigger var requireSig int if err := s.Scan(&t.ID, &t.Kind, &t.Name, &t.Config, &t.WebhookSecret, &t.WebhookSigningSecret, &requireSig, - &t.CreatedAt, &t.UpdatedAt); err != nil { + &t.LastFiredAt, &t.CreatedAt, &t.UpdatedAt); err != nil { return Trigger{}, err } t.WebhookRequireSignature = requireSig != 0 @@ -38,10 +38,10 @@ func (s *Store) CreateTrigger(t Trigger) (Trigger, error) { t.UpdatedAt = t.CreatedAt _, err := s.db.Exec( `INSERT INTO triggers (`+triggerColumns+`) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, t.ID, t.Kind, t.Name, t.Config, t.WebhookSecret, t.WebhookSigningSecret, BoolToInt(t.WebhookRequireSignature), - t.CreatedAt, t.UpdatedAt, + t.LastFiredAt, t.CreatedAt, t.UpdatedAt, ) if err != nil { return Trigger{}, fmt.Errorf("insert trigger: %w", translateSQLError(err)) @@ -139,7 +139,7 @@ func (s *Store) ListTriggersWithBindingCount(kind string) ([]TriggerWithBindingC const base = ` SELECT t.id, t.kind, t.name, t.config, t.webhook_secret, t.webhook_signing_secret, t.webhook_require_signature, - t.created_at, t.updated_at, + t.last_fired_at, t.created_at, t.updated_at, COALESCE(b.cnt, 0) FROM triggers t LEFT JOIN ( @@ -166,7 +166,7 @@ func (s *Store) ListTriggersWithBindingCount(kind string) ([]TriggerWithBindingC var count int if err := rows.Scan(&t.ID, &t.Kind, &t.Name, &t.Config, &t.WebhookSecret, &t.WebhookSigningSecret, &requireSig, - &t.CreatedAt, &t.UpdatedAt, &count); err != nil { + &t.LastFiredAt, &t.CreatedAt, &t.UpdatedAt, &count); err != nil { return nil, fmt.Errorf("scan trigger+count: %w", err) } t.WebhookRequireSignature = requireSig != 0 @@ -236,10 +236,10 @@ func (s *Store) CreateTriggerWithBindingTx(t Trigger, b WorkloadTriggerBinding) t.UpdatedAt = t.CreatedAt if _, err := tx.Exec( `INSERT INTO triggers (`+triggerColumns+`) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`, + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, t.ID, t.Kind, t.Name, t.Config, t.WebhookSecret, t.WebhookSigningSecret, BoolToInt(t.WebhookRequireSignature), - t.CreatedAt, t.UpdatedAt, + t.LastFiredAt, t.CreatedAt, t.UpdatedAt, ); err != nil { return Trigger{}, WorkloadTriggerBinding{}, fmt.Errorf("insert trigger: %w", translateSQLError(err)) } @@ -301,3 +301,24 @@ func (s *Store) EnsureTriggerWebhookSecret(id string) (string, error) { } return secret, nil } + +// SetTriggerLastFired records the wall-clock the scheduler last +// dispatched this trigger. Callers pass time.Now().UTC().Format(time.RFC3339) +// so the value is stable across timezones. Updating last_fired_at does +// not bump updated_at — last_fired_at is operational state, while +// updated_at tracks user-visible config edits. +func (s *Store) SetTriggerLastFired(id, ts string) error { + result, err := s.db.Exec( + `UPDATE triggers SET last_fired_at = ? WHERE id = ?`, + ts, id, + ) + if err != nil { + return fmt.Errorf("update trigger last_fired_at: %w", err) + } + n, _ := result.RowsAffected() + if n == 0 { + return fmt.Errorf("trigger %s: %w", id, ErrNotFound) + } + return nil +} + diff --git a/internal/webhook/trigger_handler.go b/internal/webhook/trigger_handler.go index 91cb117..1c61756 100644 --- a/internal/webhook/trigger_handler.go +++ b/internal/webhook/trigger_handler.go @@ -25,9 +25,10 @@ import ( // already serializes pulls). const maxTriggerFanOutConcurrency = 4 -// bindingResult is the per-binding entry in the trigger fan-out -// response body. -type bindingResult struct { +// BindingResult is the per-binding entry in the trigger fan-out +// response body. Exported so non-HTTP callers (the scheduler) can +// inspect outcomes after calling FanOutForTrigger. +type BindingResult struct { Workload string `json:"workload"` Deployed bool `json:"deployed"` Reason string `json:"reason,omitempty"` @@ -191,6 +192,35 @@ func (h *Handler) handleTriggerWebhook(w http.ResponseWriter, r *http.Request) { }) } +// FanOutForTrigger looks up the trigger plugin + bindings for trg and +// dispatches evt through the same bounded worker pool the inbound HTTP +// webhook uses. The scheduler calls this on each tick to fire schedule +// triggers without a real HTTP request — same dispatch path, same +// per-binding isolation, same outcome shape. +// +// Returns nil + error only when the trigger plugin is missing or the +// bindings query fails — both fatal upstream conditions the caller +// should log. A per-binding error becomes a row in the result slice +// with Deployed=false; that case returns nil error. +func (h *Handler) FanOutForTrigger( + ctx context.Context, + trg store.Trigger, + evt plugin.InboundEvent, +) ([]BindingResult, error) { + if h.plugins == nil { + return nil, fmt.Errorf("plugin dispatcher not wired") + } + trigPlugin, err := plugin.GetTrigger(trg.Kind) + if err != nil { + return nil, fmt.Errorf("trigger plugin %q: %w", trg.Kind, err) + } + bindings, err := h.store.ListBindingsForTrigger(trg.ID) + if err != nil { + return nil, fmt.Errorf("list bindings: %w", err) + } + return h.fanOutBindings(ctx, trg, trigPlugin, bindings, evt), nil +} + // fanOutBindings dispatches every binding through fireBinding with at // most maxTriggerFanOutConcurrency goroutines in flight. Order of the // returned slice matches the input bindings slice so callers can rely @@ -205,8 +235,8 @@ func (h *Handler) fanOutBindings( trigPlugin plugin.Trigger, bindings []store.WorkloadTriggerBinding, evt plugin.InboundEvent, -) []bindingResult { - results := make([]bindingResult, len(bindings)) +) []BindingResult { + results := make([]BindingResult, len(bindings)) concurrency := maxTriggerFanOutConcurrency if len(bindings) < concurrency { concurrency = len(bindings) @@ -218,14 +248,14 @@ func (h *Handler) fanOutBindings( var wg sync.WaitGroup for i, b := range bindings { if !b.Enabled { - results[i] = bindingResult{Workload: b.WorkloadID, Deployed: false, Reason: "binding disabled"} + results[i] = BindingResult{Workload: b.WorkloadID, Deployed: false, Reason: "binding disabled"} continue } row, lookupErr := h.store.GetWorkloadByID(b.WorkloadID) if lookupErr != nil { slog.Warn("webhook: bound workload missing", "trigger", trg.Name, "workload", b.WorkloadID, "error", lookupErr) - results[i] = bindingResult{Workload: b.WorkloadID, Deployed: false, Reason: "workload missing"} + results[i] = BindingResult{Workload: b.WorkloadID, Deployed: false, Reason: "workload missing"} continue } wg.Add(1) @@ -234,7 +264,7 @@ func (h *Handler) fanOutBindings( defer wg.Done() defer func() { <-sem }() fired, reason := h.fireBinding(ctx, trg, trigPlugin, wl, binding, evt) - results[idx] = bindingResult{Workload: wl.Name, Deployed: fired, Reason: reason} + results[idx] = BindingResult{Workload: wl.Name, Deployed: fired, Reason: reason} }(i, b, row) } wg.Wait() diff --git a/internal/workload/plugin/plugin.go b/internal/workload/plugin/plugin.go index 2e92d05..3837d6b 100644 --- a/internal/workload/plugin/plugin.go +++ b/internal/workload/plugin/plugin.go @@ -61,7 +61,7 @@ type Workload struct { SourceKind string // "image" | "compose" | "static" | ... SourceConfig json.RawMessage // shape determined by SourceKind - TriggerKind string // "registry" | "git" | "manual" | "cron" | ... + TriggerKind string // "registry" | "git" | "manual" | "schedule" | ... TriggerConfig json.RawMessage // shape determined by TriggerKind PublicFaces []PublicFace // zero or more public routes diff --git a/internal/workload/plugin/trigger/schedule/schedule.go b/internal/workload/plugin/trigger/schedule/schedule.go new file mode 100644 index 0000000..4c2962d --- /dev/null +++ b/internal/workload/plugin/trigger/schedule/schedule.go @@ -0,0 +1,129 @@ +// Package schedule implements the "schedule" trigger: fires a deploy on +// a recurring time interval driven by the internal scheduler. +// +// v1 is interval-based ("every 24h"). A future revision can add a cron +// expression field; the plugin keeps the JSON shape forward-compatible +// by ignoring unknown keys. +package schedule + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/alexei/tinyforge/internal/workload/plugin" +) + +// Config is the per-trigger schedule config. +// +// Interval is a Go duration string ("1h", "24h", "168h"). The scheduler +// rejects intervals below MinInterval so a misconfigured trigger cannot +// run away and saturate the docker daemon. +// +// Reference is an optional string passed through to DeploymentIntent so +// the operator can pin a specific tag/sha to redeploy on every tick +// (e.g. always re-pull "stable"). Empty means the Source uses whatever +// it normally would. +type Config struct { + Interval string `json:"interval"` + Reference string `json:"reference,omitempty"` +} + +// MinInterval is the floor enforced at Validate time. One minute is a +// pragmatic lower bound: shorter intervals would mostly serve as +// accidental denial-of-service against the deployer. +const MinInterval = time.Minute + +type trigger struct{} + +func init() { plugin.RegisterTrigger(&trigger{}) } + +func (*trigger) Kind() string { return "schedule" } + +func (*trigger) SchemaSample() any { + return Config{Interval: "24h"} +} + +func (*trigger) Validate(cfg json.RawMessage) error { + if len(cfg) == 0 { + return fmt.Errorf("schedule trigger: config is required") + } + var c Config + if err := json.Unmarshal(cfg, &c); err != nil { + return fmt.Errorf("schedule trigger: invalid json: %w", err) + } + if strings.TrimSpace(c.Interval) == "" { + return fmt.Errorf("schedule trigger: interval is required (e.g. \"24h\")") + } + d, err := ParseInterval(c.Interval) + if err != nil { + return fmt.Errorf("schedule trigger: %w", err) + } + if d < MinInterval { + return fmt.Errorf("schedule trigger: interval %s is below minimum %s", + d, MinInterval) + } + return nil +} + +// ParseInterval parses a duration string with the same syntax as +// time.ParseDuration ("90s", "5m", "2h45m"). Exported so the scheduler +// can reuse the same parser and stay consistent with Validate. +func ParseInterval(s string) (time.Duration, error) { + d, err := time.ParseDuration(strings.TrimSpace(s)) + if err != nil { + return 0, fmt.Errorf("invalid interval %q: %w", s, err) + } + if d <= 0 { + return 0, fmt.Errorf("invalid interval %q: must be positive", s) + } + return d, nil +} + +// IntervalOf reads the interval out of a workload-effective trigger +// config. Returns the parsed duration; an error if the config is +// missing the interval or malformed. Used by the scheduler when it +// already knows the merged config and wants to schedule next fire. +func IntervalOf(cfg json.RawMessage) (time.Duration, error) { + var c Config + if len(cfg) == 0 { + return 0, fmt.Errorf("schedule trigger: empty config") + } + if err := json.Unmarshal(cfg, &c); err != nil { + return 0, fmt.Errorf("schedule trigger: invalid json: %w", err) + } + return ParseInterval(c.Interval) +} + +// IntervalOfRaw is a convenience that decodes the raw trigger.config +// blob from store.Trigger (a JSON string). Wrapper kept tiny because +// the scheduler holds the raw string, not a parsed shape. +func IntervalOfRaw(raw string) (time.Duration, error) { + return IntervalOf(json.RawMessage(raw)) +} + +func (*trigger) Match(ctx context.Context, deps plugin.Deps, w plugin.Workload, evt plugin.InboundEvent) (*plugin.DeploymentIntent, error) { + if evt.Kind != "schedule" || evt.Schedule == nil { + return nil, nil + } + cfg, err := plugin.TriggerConfigOf[Config](w) + if err != nil { + return nil, fmt.Errorf("schedule trigger: decode config: %w", err) + } + firedAt := evt.Schedule.FiredAt + if firedAt.IsZero() { + firedAt = time.Now().UTC() + } + meta := map[string]string{ + "interval": cfg.Interval, + } + return &plugin.DeploymentIntent{ + Reason: "schedule", + Reference: strings.TrimSpace(cfg.Reference), + Metadata: meta, + TriggeredAt: firedAt, + TriggeredBy: "scheduler", + }, nil +} diff --git a/internal/workload/plugin/trigger/schedule/schedule_test.go b/internal/workload/plugin/trigger/schedule/schedule_test.go new file mode 100644 index 0000000..6957a31 --- /dev/null +++ b/internal/workload/plugin/trigger/schedule/schedule_test.go @@ -0,0 +1,197 @@ +package schedule + +import ( + "context" + "encoding/json" + "strings" + "testing" + "time" + + "github.com/alexei/tinyforge/internal/workload/plugin" +) + +func TestValidate(t *testing.T) { + tr := &trigger{} + cases := []struct { + name string + cfg json.RawMessage + wantErr string // substring; empty = expect success + }{ + {"empty body rejected", nil, "config is required"}, + {"empty object rejected", json.RawMessage(`{}`), "interval is required"}, + {"missing interval rejected", json.RawMessage(`{"reference":"v1"}`), "interval is required"}, + {"invalid json rejected", json.RawMessage(`not json`), "invalid json"}, + {"unparseable interval rejected", json.RawMessage(`{"interval":"banana"}`), "invalid interval"}, + {"zero interval rejected", json.RawMessage(`{"interval":"0s"}`), "must be positive"}, + {"sub-minute rejected", json.RawMessage(`{"interval":"30s"}`), "below minimum"}, + {"exact minimum accepted", json.RawMessage(`{"interval":"1m"}`), ""}, + {"hour accepted", json.RawMessage(`{"interval":"1h"}`), ""}, + {"day accepted", json.RawMessage(`{"interval":"24h"}`), ""}, + {"unknown keys tolerated", json.RawMessage(`{"interval":"1h","future_field":42}`), ""}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + err := tr.Validate(tc.cfg) + if tc.wantErr == "" { + if err != nil { + t.Fatalf("expected nil err, got %v", err) + } + return + } + if err == nil { + t.Fatalf("expected error containing %q, got nil", tc.wantErr) + } + if !strings.Contains(err.Error(), tc.wantErr) { + t.Fatalf("error %q missing substring %q", err.Error(), tc.wantErr) + } + }) + } +} + +func TestParseInterval(t *testing.T) { + cases := []struct { + in string + want time.Duration + err bool + }{ + {"1h", time.Hour, false}, + {"24h", 24 * time.Hour, false}, + {" 5m ", 5 * time.Minute, false}, + {"168h", 7 * 24 * time.Hour, false}, + {"", 0, true}, + {"banana", 0, true}, + {"-1h", 0, true}, + {"0s", 0, true}, + } + for _, tc := range cases { + t.Run(tc.in, func(t *testing.T) { + got, err := ParseInterval(tc.in) + if tc.err { + if err == nil { + t.Fatalf("expected error, got nil") + } + return + } + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if got != tc.want { + t.Fatalf("got %s, want %s", got, tc.want) + } + }) + } +} + +func TestIntervalOfRaw(t *testing.T) { + d, err := IntervalOfRaw(`{"interval":"30m"}`) + if err != nil { + t.Fatalf("err: %v", err) + } + if d != 30*time.Minute { + t.Fatalf("got %s, want 30m", d) + } + if _, err := IntervalOfRaw(""); err == nil { + t.Fatalf("expected error on empty raw") + } +} + +func TestMatch_WrongKindIgnored(t *testing.T) { + tr := &trigger{} + wl := plugin.Workload{ID: "w1", TriggerConfig: json.RawMessage(`{"interval":"1h"}`)} + intent, err := tr.Match(context.Background(), plugin.Deps{}, wl, + plugin.InboundEvent{Kind: "manual"}) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if intent != nil { + t.Fatalf("expected nil intent for wrong kind, got %+v", intent) + } +} + +func TestMatch_MissingSchedulePayload(t *testing.T) { + tr := &trigger{} + wl := plugin.Workload{ID: "w1", TriggerConfig: json.RawMessage(`{"interval":"1h"}`)} + intent, err := tr.Match(context.Background(), plugin.Deps{}, wl, + plugin.InboundEvent{Kind: "schedule"}) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if intent != nil { + t.Fatalf("expected nil intent when payload missing, got %+v", intent) + } +} + +func TestMatch_SchedulePopulatesIntent(t *testing.T) { + tr := &trigger{} + wl := plugin.Workload{ + ID: "w1", + TriggerConfig: json.RawMessage(`{"interval":"6h","reference":"stable"}`), + } + fixed := time.Date(2026, 5, 16, 12, 0, 0, 0, time.UTC) + intent, err := tr.Match(context.Background(), plugin.Deps{}, wl, + plugin.InboundEvent{ + Kind: "schedule", + Schedule: &plugin.ScheduleEvent{FiredAt: fixed}, + }) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if intent == nil { + t.Fatalf("expected intent") + } + if intent.Reason != "schedule" { + t.Errorf("Reason = %q, want schedule", intent.Reason) + } + if intent.Reference != "stable" { + t.Errorf("Reference = %q, want stable", intent.Reference) + } + if intent.TriggeredBy != "scheduler" { + t.Errorf("TriggeredBy = %q, want scheduler", intent.TriggeredBy) + } + if !intent.TriggeredAt.Equal(fixed) { + t.Errorf("TriggeredAt = %s, want %s", intent.TriggeredAt, fixed) + } + if intent.Metadata["interval"] != "6h" { + t.Errorf("interval metadata = %q, want 6h", intent.Metadata["interval"]) + } +} + +func TestMatch_ZeroFiredAtFallsBackToNow(t *testing.T) { + tr := &trigger{} + wl := plugin.Workload{ + ID: "w1", + TriggerConfig: json.RawMessage(`{"interval":"1h"}`), + } + before := time.Now().UTC() + intent, err := tr.Match(context.Background(), plugin.Deps{}, wl, + plugin.InboundEvent{ + Kind: "schedule", + Schedule: &plugin.ScheduleEvent{}, + }) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + after := time.Now().UTC() + if intent.TriggeredAt.Before(before) || intent.TriggeredAt.After(after.Add(time.Second)) { + t.Errorf("TriggeredAt %s not in [%s, %s]", intent.TriggeredAt, before, after) + } +} + +func TestKindAndSchemaSample(t *testing.T) { + tr := &trigger{} + if tr.Kind() != "schedule" { + t.Fatalf("Kind = %q, want schedule", tr.Kind()) + } + type withSample interface{ SchemaSample() any } + s, ok := any(tr).(withSample) + if !ok { + t.Fatalf("trigger does not expose SchemaSample()") + } + sample, ok := s.SchemaSample().(Config) + if !ok { + t.Fatalf("SchemaSample is not Config: %T", s.SchemaSample()) + } + if _, err := ParseInterval(sample.Interval); err != nil { + t.Errorf("SchemaSample.Interval is not parseable: %v", err) + } +} diff --git a/internal/workload/plugin/types.go b/internal/workload/plugin/types.go index 81a6bca..2c45c0a 100644 --- a/internal/workload/plugin/types.go +++ b/internal/workload/plugin/types.go @@ -29,18 +29,19 @@ type PublicFace struct { EnableSSL bool } -// InboundEvent is what an upstream signal (webhook, poll, manual click) -// looks like to a Trigger.Match call. Triggers consult Kind first to -// decide whether the event is interesting, then read the matching payload -// field. RawBody / Headers are kept so trigger plugins can perform their -// own signature verification or vendor-specific parsing. +// InboundEvent is what an upstream signal (webhook, poll, manual click, +// scheduler tick) looks like to a Trigger.Match call. Triggers consult +// Kind first to decide whether the event is interesting, then read the +// matching payload field. RawBody / Headers are kept so trigger plugins +// can perform their own signature verification or vendor-specific parsing. type InboundEvent struct { - Kind string // "image-push" | "git-push" | "git-tag" | "manual" | "cron-tick" - Image *ImagePushEvent - Git *GitEvent - Manual *ManualEvent - RawBody []byte - Headers map[string][]string + Kind string // "image-push" | "git-push" | "git-tag" | "manual" | "schedule" + Image *ImagePushEvent + Git *GitEvent + Manual *ManualEvent + Schedule *ScheduleEvent + RawBody []byte + Headers map[string][]string } // ImagePushEvent is normalized across registry vendors (generic, Gitea, @@ -72,6 +73,14 @@ type ManualEvent struct { Note string } +// ScheduleEvent is fired by the internal scheduler when a schedule +// trigger's next-fire window is reached. FiredAt is the wall-clock the +// scheduler observed (already truncated to the second). The trigger +// plugin uses FiredAt + its own config to populate the DeploymentIntent. +type ScheduleEvent struct { + FiredAt time.Time +} + // SourceConfigOf decodes the workload's SourceConfig blob into the typed // shape a specific Source uses. Kept here so callers do not duplicate the // boilerplate. diff --git a/web/src/lib/api.ts b/web/src/lib/api.ts index 8467ab5..a87baf1 100644 --- a/web/src/lib/api.ts +++ b/web/src/lib/api.ts @@ -719,6 +719,9 @@ export interface RedeployTrigger { webhook_enabled: boolean; webhook_require_signature: boolean; binding_count: number; + /** RFC3339 timestamp the scheduler last dispatched this trigger. Empty for + * never-fired or non-scheduler-driven triggers. */ + last_fired_at: string; created_at: string; updated_at: string; } diff --git a/web/src/lib/components/TriggerKindForm.svelte b/web/src/lib/components/TriggerKindForm.svelte index 1a0d487..22a6a4c 100644 --- a/web/src/lib/components/TriggerKindForm.svelte +++ b/web/src/lib/components/TriggerKindForm.svelte @@ -18,9 +18,17 @@