// Package scheduler drives the "schedule" trigger kind. It ticks on a // fixed interval, scans every enabled schedule trigger, and dispatches // the ones whose next-fire window has elapsed through the same // FanOutForTrigger path the inbound HTTP webhook uses. // // The scheduler is intentionally simple: // // - Tick on `tickInterval` (default 30s). // - For every trigger with Kind=="schedule", parse its config to get // the interval, compute (LastFiredAt + interval), and if now >= // that target, fire. // - On fire: build a plugin.InboundEvent{Kind: "schedule"} and call // handler.FanOutForTrigger. last_fired_at is persisted BEFORE the // dispatch runs so a panicking Match cannot wedge the row into a // tight retry loop — a failed deploy waits one full interval // before retry, which is the correct trade-off for a periodic // refresh trigger. // - A never-fired trigger (LastFiredAt == "") fires on the next // tick — operator-friendly for testing "did I configure it right?". // // Per-trigger errors are logged but do not abort the tick. package scheduler import ( "context" "log/slog" "sync" "time" "github.com/alexei/tinyforge/internal/metrics" "github.com/alexei/tinyforge/internal/store" "github.com/alexei/tinyforge/internal/workload/plugin" "github.com/alexei/tinyforge/internal/workload/plugin/trigger/schedule" ) // Scheduler owns the background tick loop. type Scheduler struct { store *store.Store dispatcher fanOutFn tickInterval time.Duration clock func() time.Time // overridable for tests startOnce sync.Once stopOnce sync.Once cancel context.CancelFunc wg sync.WaitGroup } // fanOutFn is the internal callback shape — narrower than the public // FanOutTrigger interface so the wiring in cmd/server/main.go can pass // a closure directly without standing up a wrapper type. type fanOutFn func(ctx context.Context, trg store.Trigger, evt plugin.InboundEvent) error // New constructs a Scheduler bound to `st` that dispatches via `fanOut`. // `tickInterval` controls how often the loop wakes up to check // schedules; values <=0 fall back to 30s. Tick intervals longer than 5 // minutes are clamped so a misconfigured value can't silently disable // schedules. // // `fanOut` should call webhook.Handler.FanOutForTrigger and return its // error (or nil); the per-binding result slice is discarded — the // scheduler does not need to know per-binding outcomes, only whether // the dispatch itself failed. func New(st *store.Store, fanOut fanOutFn, tickInterval time.Duration) *Scheduler { clamped := tickInterval if clamped <= 0 { clamped = 30 * time.Second } if clamped > 5*time.Minute { clamped = 5 * time.Minute } if clamped != tickInterval && tickInterval != 0 { slog.Warn("scheduler: tick interval clamped", "requested", tickInterval, "applied", clamped) } return &Scheduler{ store: st, dispatcher: fanOut, tickInterval: clamped, clock: func() time.Time { return time.Now().UTC() }, } } // Start launches the loop. Idempotent — repeat calls are no-ops, not // goroutine leaks. Mirrors the reconciler's lifecycle. func (s *Scheduler) Start(ctx context.Context) { s.startOnce.Do(func() { ctx, cancel := context.WithCancel(ctx) s.cancel = cancel s.wg.Add(1) go s.loop(ctx) }) } // Stop cancels the context and waits for the in-flight tick. Idempotent // via sync.Once — second call returns immediately without panicking on // a double cancel. func (s *Scheduler) Stop() { s.stopOnce.Do(func() { if s.cancel != nil { s.cancel() } }) s.wg.Wait() } func (s *Scheduler) loop(ctx context.Context) { defer s.wg.Done() // First sweep at boot so a daily schedule does not idle 24h after a // restart before it picks up rows whose window already elapsed. s.TickOnce(ctx) ticker := time.NewTicker(s.tickInterval) defer ticker.Stop() for { select { case <-ctx.Done(): return case <-ticker.C: s.TickOnce(ctx) } } } // TickOnce runs a single sweep. Exposed for tests and for the boot // kick. On error per-trigger the loop continues with the next row. func (s *Scheduler) TickOnce(ctx context.Context) { metrics.SchedulerTicksTotal.Inc() rows, err := s.store.ListTriggers("schedule") if err != nil { slog.Warn("scheduler: list triggers", "error", err) return } now := s.clock() for _, t := range rows { if !s.shouldFire(t, now) { continue } s.fire(ctx, t, now) } } // shouldFire decides whether to dispatch trg at `now`. Returns true if: // - the trigger's interval is parseable, AND // - last_fired_at is empty (never fired) OR now >= lastFired + interval. // // Unparseable last_fired_at or interval are logged once and treated as // "do not fire" — the operator needs to fix the config; the scheduler // must not loop on a broken row. func (s *Scheduler) shouldFire(t store.Trigger, now time.Time) bool { interval, err := schedule.IntervalOfRaw(t.Config) if err != nil { slog.Warn("scheduler: bad interval", "trigger", t.Name, "error", err) return false } // Defense-in-depth against a hand-inserted row that bypassed // Validate (manual SQL, restore, ad-hoc migration). Validate // already enforces the floor on the create path; this re-check // keeps the loop honest if anything sneaks past it. if interval < schedule.MinInterval { slog.Warn("scheduler: interval below minimum, ignoring", "trigger", t.Name, "interval", interval, "minimum", schedule.MinInterval) return false } if t.LastFiredAt == "" { return true } last, err := time.Parse(time.RFC3339, t.LastFiredAt) if err != nil { slog.Warn("scheduler: bad last_fired_at", "trigger", t.Name, "value", t.LastFiredAt, "error", err) // Treat as never-fired so the operator's fix-by-redeploy doesn't // require a manual DB poke. return true } if now.Before(last.Add(interval)) { return false } // Catch-up warning: a trigger whose last_fired_at is many intervals // old (paused-then-resumed, restored from backup, or just left // running while the dispatcher was down) WILL fire on this tick. // Log a one-line warning so the operator can recognize the "surprise // burst at restart" pattern in audit logs. We still fire — silent // no-fire would be worse — but the warning explains why. if overdue := now.Sub(last); overdue > catchUpWarnThreshold*interval { slog.Warn("scheduler: catch-up fire (very overdue)", "trigger", t.Name, "overdue", overdue, "interval", interval) } return true } // catchUpWarnThreshold is the multiplier on `interval` past which a // fire is logged as "catch-up." 2× means a daily schedule whose last // fire was more than 48h ago gets a warning at next tick. Chosen so // the warning fires on "wedged for many intervals" without alerting on // the every-tick lag a healthy 30s-tick scheduler accumulates against // a sub-minute interval. Bigger threshold = noisier-quiet trade-off; // 2× is the smallest value that excludes single-tick lag. const catchUpWarnThreshold = 2 // fire dispatches one trigger and records the new last_fired_at. // // We persist last_fired_at BEFORE calling the dispatcher so a panic // inside Match cannot wedge the row into a tight loop. Down-side: a // deploy that fails leaves the scheduler waiting one full interval // before retry — acceptable because the trigger is a periodic refresh, // not a critical-path retry mechanism. func (s *Scheduler) fire(ctx context.Context, t store.Trigger, now time.Time) { // Belt-and-suspenders: ListTriggersByKind only returns "schedule" // rows, but if a future caller wires fire() differently this guard // keeps the scheduler from blindly dispatching a kind it isn't // designed for. if t.Kind != "schedule" { slog.Warn("scheduler: refusing to fire non-schedule kind", "trigger", t.Name, "kind", t.Kind) return } ts := now.Format(time.RFC3339) if err := s.store.SetTriggerLastFired(t.ID, ts); err != nil { slog.Warn("scheduler: persist last_fired_at", "trigger", t.Name, "error", err) return } evt := plugin.InboundEvent{ Kind: "schedule", Schedule: &plugin.ScheduleEvent{FiredAt: now}, } if err := s.dispatcher(ctx, t, evt); err != nil { slog.Warn("scheduler: dispatch", "trigger", t.Name, "error", err) return } metrics.SchedulerDispatchedTotal.Inc() slog.Info("scheduler: fired", "trigger", t.Name, "kind", t.Kind, "at", ts) }