5e78f13e06
Build / build (push) Failing after 34s
Follow-ups on commit 39e1e36 addressing review feedback from
go-reviewer / security-reviewer / typescript-reviewer.
Backend:
- New POST /api/triggers/{id}/fire (AdminOnly, schedule-only): operator
"Fire now" button — dispatches immediately without waiting for the
next natural interval. Persists last_fired_at BEFORE dispatch, same
ordering as the scheduler. Per-trigger in-flight guard (429 if a
fire is already running) to defend against rapid double-clicks /
runaway scripts. Refuses request when AdminOnly claims are absent
rather than logging an unattributable deploy.
- SetTriggerLastFired now validates timestamp parses as RFC3339 before
writing. Rejects empty string explicitly — empty-clears semantics
were dead (no caller) and would silently re-fire on next tick if
ever accidentally written. A future reset-cadence flow must add a
dedicated ClearTriggerLastFired so the call site is grep-able and
separately auditable.
- Scheduler logs WARN on catch-up fires (now - lastFired > 2× interval)
so the "surprise burst at restart" pattern shows up in audit logs.
- BindingResult reason strings extracted to package consts
(webhook.Reason*) so the scheduler and api fire-now classifications
stay in sync without string-matching drift.
- SECURITY NOTE on FanOutForTrigger documents that the
WebhookRequireSignature gate is ingress-only by design.
Frontend:
- Refactored /triggers/new (770 LOC → 155 LOC) and /triggers/[id]
(~350 LOC dropped) to use the shared TriggerKindForm. Eliminates the
triplicated per-kind state + buildConfig + canSubmit + template that
caused the d-unit regex drift in the prior commit.
- New seedTriggerKindFormState helper on TriggerKindForm primes the
form from a server-returned trigger config with defensive type
guards; resets per-kind slots first so re-seeding across kinds
doesn't inherit stale state.
- /triggers/[id] gains a Schedule status panel with Last Fired + Fire
Now button (gated on binding_count > 0). Confirmation dialog,
result flash, timer cleanup on unmount + new-fire (no stale-closure
race). EN+RU i18n parity.
231 lines
8.1 KiB
Go
231 lines
8.1 KiB
Go
// Package scheduler drives the "schedule" trigger kind. It ticks on a
|
||
// fixed interval, scans every enabled schedule trigger, and dispatches
|
||
// the ones whose next-fire window has elapsed through the same
|
||
// FanOutForTrigger path the inbound HTTP webhook uses.
|
||
//
|
||
// The scheduler is intentionally simple:
|
||
//
|
||
// - Tick on `tickInterval` (default 30s).
|
||
// - For every trigger with Kind=="schedule", parse its config to get
|
||
// the interval, compute (LastFiredAt + interval), and if now >=
|
||
// that target, fire.
|
||
// - On fire: build a plugin.InboundEvent{Kind: "schedule"} and call
|
||
// handler.FanOutForTrigger. last_fired_at is persisted BEFORE the
|
||
// dispatch runs so a panicking Match cannot wedge the row into a
|
||
// tight retry loop — a failed deploy waits one full interval
|
||
// before retry, which is the correct trade-off for a periodic
|
||
// refresh trigger.
|
||
// - A never-fired trigger (LastFiredAt == "") fires on the next
|
||
// tick — operator-friendly for testing "did I configure it right?".
|
||
//
|
||
// Per-trigger errors are logged but do not abort the tick.
|
||
package scheduler
|
||
|
||
import (
|
||
"context"
|
||
"log/slog"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/alexei/tinyforge/internal/store"
|
||
"github.com/alexei/tinyforge/internal/workload/plugin"
|
||
"github.com/alexei/tinyforge/internal/workload/plugin/trigger/schedule"
|
||
)
|
||
|
||
// Scheduler owns the background tick loop.
|
||
type Scheduler struct {
|
||
store *store.Store
|
||
dispatcher fanOutFn
|
||
tickInterval time.Duration
|
||
clock func() time.Time // overridable for tests
|
||
|
||
startOnce sync.Once
|
||
stopOnce sync.Once
|
||
cancel context.CancelFunc
|
||
wg sync.WaitGroup
|
||
}
|
||
|
||
// fanOutFn is the internal callback shape — narrower than the public
|
||
// FanOutTrigger interface so the wiring in cmd/server/main.go can pass
|
||
// a closure directly without standing up a wrapper type.
|
||
type fanOutFn func(ctx context.Context, trg store.Trigger, evt plugin.InboundEvent) error
|
||
|
||
// New constructs a Scheduler bound to `st` that dispatches via `fanOut`.
|
||
// `tickInterval` controls how often the loop wakes up to check
|
||
// schedules; values <=0 fall back to 30s. Tick intervals longer than 5
|
||
// minutes are clamped so a misconfigured value can't silently disable
|
||
// schedules.
|
||
//
|
||
// `fanOut` should call webhook.Handler.FanOutForTrigger and return its
|
||
// error (or nil); the per-binding result slice is discarded — the
|
||
// scheduler does not need to know per-binding outcomes, only whether
|
||
// the dispatch itself failed.
|
||
func New(st *store.Store, fanOut fanOutFn, tickInterval time.Duration) *Scheduler {
|
||
clamped := tickInterval
|
||
if clamped <= 0 {
|
||
clamped = 30 * time.Second
|
||
}
|
||
if clamped > 5*time.Minute {
|
||
clamped = 5 * time.Minute
|
||
}
|
||
if clamped != tickInterval && tickInterval != 0 {
|
||
slog.Warn("scheduler: tick interval clamped",
|
||
"requested", tickInterval, "applied", clamped)
|
||
}
|
||
return &Scheduler{
|
||
store: st,
|
||
dispatcher: fanOut,
|
||
tickInterval: clamped,
|
||
clock: func() time.Time { return time.Now().UTC() },
|
||
}
|
||
}
|
||
|
||
// Start launches the loop. Idempotent — repeat calls are no-ops, not
|
||
// goroutine leaks. Mirrors the reconciler's lifecycle.
|
||
func (s *Scheduler) Start(ctx context.Context) {
|
||
s.startOnce.Do(func() {
|
||
ctx, cancel := context.WithCancel(ctx)
|
||
s.cancel = cancel
|
||
s.wg.Add(1)
|
||
go s.loop(ctx)
|
||
})
|
||
}
|
||
|
||
// Stop cancels the context and waits for the in-flight tick. Idempotent
|
||
// via sync.Once — second call returns immediately without panicking on
|
||
// a double cancel.
|
||
func (s *Scheduler) Stop() {
|
||
s.stopOnce.Do(func() {
|
||
if s.cancel != nil {
|
||
s.cancel()
|
||
}
|
||
})
|
||
s.wg.Wait()
|
||
}
|
||
|
||
func (s *Scheduler) loop(ctx context.Context) {
|
||
defer s.wg.Done()
|
||
// First sweep at boot so a daily schedule does not idle 24h after a
|
||
// restart before it picks up rows whose window already elapsed.
|
||
s.TickOnce(ctx)
|
||
|
||
ticker := time.NewTicker(s.tickInterval)
|
||
defer ticker.Stop()
|
||
for {
|
||
select {
|
||
case <-ctx.Done():
|
||
return
|
||
case <-ticker.C:
|
||
s.TickOnce(ctx)
|
||
}
|
||
}
|
||
}
|
||
|
||
// TickOnce runs a single sweep. Exposed for tests and for the boot
|
||
// kick. On error per-trigger the loop continues with the next row.
|
||
func (s *Scheduler) TickOnce(ctx context.Context) {
|
||
rows, err := s.store.ListTriggers("schedule")
|
||
if err != nil {
|
||
slog.Warn("scheduler: list triggers", "error", err)
|
||
return
|
||
}
|
||
now := s.clock()
|
||
for _, t := range rows {
|
||
if !s.shouldFire(t, now) {
|
||
continue
|
||
}
|
||
s.fire(ctx, t, now)
|
||
}
|
||
}
|
||
|
||
// shouldFire decides whether to dispatch trg at `now`. Returns true if:
|
||
// - the trigger's interval is parseable, AND
|
||
// - last_fired_at is empty (never fired) OR now >= lastFired + interval.
|
||
//
|
||
// Unparseable last_fired_at or interval are logged once and treated as
|
||
// "do not fire" — the operator needs to fix the config; the scheduler
|
||
// must not loop on a broken row.
|
||
func (s *Scheduler) shouldFire(t store.Trigger, now time.Time) bool {
|
||
interval, err := schedule.IntervalOfRaw(t.Config)
|
||
if err != nil {
|
||
slog.Warn("scheduler: bad interval", "trigger", t.Name, "error", err)
|
||
return false
|
||
}
|
||
// Defense-in-depth against a hand-inserted row that bypassed
|
||
// Validate (manual SQL, restore, ad-hoc migration). Validate
|
||
// already enforces the floor on the create path; this re-check
|
||
// keeps the loop honest if anything sneaks past it.
|
||
if interval < schedule.MinInterval {
|
||
slog.Warn("scheduler: interval below minimum, ignoring",
|
||
"trigger", t.Name, "interval", interval, "minimum", schedule.MinInterval)
|
||
return false
|
||
}
|
||
if t.LastFiredAt == "" {
|
||
return true
|
||
}
|
||
last, err := time.Parse(time.RFC3339, t.LastFiredAt)
|
||
if err != nil {
|
||
slog.Warn("scheduler: bad last_fired_at", "trigger", t.Name,
|
||
"value", t.LastFiredAt, "error", err)
|
||
// Treat as never-fired so the operator's fix-by-redeploy doesn't
|
||
// require a manual DB poke.
|
||
return true
|
||
}
|
||
if now.Before(last.Add(interval)) {
|
||
return false
|
||
}
|
||
// Catch-up warning: a trigger whose last_fired_at is many intervals
|
||
// old (paused-then-resumed, restored from backup, or just left
|
||
// running while the dispatcher was down) WILL fire on this tick.
|
||
// Log a one-line warning so the operator can recognize the "surprise
|
||
// burst at restart" pattern in audit logs. We still fire — silent
|
||
// no-fire would be worse — but the warning explains why.
|
||
if overdue := now.Sub(last); overdue > catchUpWarnThreshold*interval {
|
||
slog.Warn("scheduler: catch-up fire (very overdue)",
|
||
"trigger", t.Name, "overdue", overdue, "interval", interval)
|
||
}
|
||
return true
|
||
}
|
||
|
||
// catchUpWarnThreshold is the multiplier on `interval` past which a
|
||
// fire is logged as "catch-up." 2× means a daily schedule whose last
|
||
// fire was more than 48h ago gets a warning at next tick. Chosen so
|
||
// the warning fires on "wedged for many intervals" without alerting on
|
||
// the every-tick lag a healthy 30s-tick scheduler accumulates against
|
||
// a sub-minute interval. Bigger threshold = noisier-quiet trade-off;
|
||
// 2× is the smallest value that excludes single-tick lag.
|
||
const catchUpWarnThreshold = 2
|
||
|
||
// fire dispatches one trigger and records the new last_fired_at.
|
||
//
|
||
// We persist last_fired_at BEFORE calling the dispatcher so a panic
|
||
// inside Match cannot wedge the row into a tight loop. Down-side: a
|
||
// deploy that fails leaves the scheduler waiting one full interval
|
||
// before retry — acceptable because the trigger is a periodic refresh,
|
||
// not a critical-path retry mechanism.
|
||
func (s *Scheduler) fire(ctx context.Context, t store.Trigger, now time.Time) {
|
||
// Belt-and-suspenders: ListTriggersByKind only returns "schedule"
|
||
// rows, but if a future caller wires fire() differently this guard
|
||
// keeps the scheduler from blindly dispatching a kind it isn't
|
||
// designed for.
|
||
if t.Kind != "schedule" {
|
||
slog.Warn("scheduler: refusing to fire non-schedule kind",
|
||
"trigger", t.Name, "kind", t.Kind)
|
||
return
|
||
}
|
||
ts := now.Format(time.RFC3339)
|
||
if err := s.store.SetTriggerLastFired(t.ID, ts); err != nil {
|
||
slog.Warn("scheduler: persist last_fired_at", "trigger", t.Name, "error", err)
|
||
return
|
||
}
|
||
evt := plugin.InboundEvent{
|
||
Kind: "schedule",
|
||
Schedule: &plugin.ScheduleEvent{FiredAt: now},
|
||
}
|
||
if err := s.dispatcher(ctx, t, evt); err != nil {
|
||
slog.Warn("scheduler: dispatch", "trigger", t.Name, "error", err)
|
||
return
|
||
}
|
||
slog.Info("scheduler: fired", "trigger", t.Name, "kind", t.Kind, "at", ts)
|
||
}
|