Files
tiny-forge/internal/notify/notifier.go
T
alexei.dolgolyov 7a9ff7ad54 feat(observability): event triggers + log scanner backend
Two paired backends sharing the events.Bus seam:

Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
  redaction on read (placeholder echo treated as "no change" on
  PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
  filters (severity CSV, source CSV, message regex with memoized
  compile cache). Structural loop-prevention: never writes to
  event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
  TierEventTrigger constant, doSendRaw shared with the legacy
  Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
  sending the real TriggerWebhookPayload shape. SSRF guard
  rejects loopback / link-local / unspecified targets. PATCH
  uses pointer-typed DTO for partial updates.

Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
  per-container token bucket, atomic drop counters), tail
  (multiplexed docker frame demuxer with TTY fallback + 16 MiB
  payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
  timestamp strip + UTF-8-safe message truncation), manager
  (5s container polling, atomic.Pointer[Snapshot] hot-reload,
  HitEmitter writes event_log + publishes EventLog so the
  trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
  stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
  EffectiveLogScanRules resolver (globals minus per-workload
  overrides plus workload-only additions). Transactional
  cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
  (sample_line → matched/captures) + /stats (drop counters +
  active tail count + last-snapshot compile errors) +
  GET /api/workloads/{id}/effective-rules.

cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 22:18:11 +03:00

359 lines
12 KiB
Go

package notify
import (
"bytes"
"context"
"crypto/hmac"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"net/url"
"sync"
"time"
"github.com/google/uuid"
)
// Event represents a deployment / site-sync notification payload.
//
// Field naming preserves backwards compatibility with the original
// deploy_success/deploy_failure events; site events reuse Project for the
// site name and leave Stage/ImageTag empty.
type Event struct {
Type string `json:"type"` // deploy_success, deploy_failure, site_sync_success, site_sync_failure, test
Project string `json:"project"`
Stage string `json:"stage"`
ImageTag string `json:"image_tag"`
Subdomain string `json:"subdomain"`
URL string `json:"url,omitempty"`
Error string `json:"error,omitempty"`
Timestamp string `json:"timestamp"`
}
// Tier identifies which configuration layer supplied the URL+secret used for
// a particular dispatch. Recorded in logs and the test-endpoint response so
// operators can debug fall-through behaviour.
type Tier string
const (
TierSettings Tier = "settings"
TierProject Tier = "project"
TierStage Tier = "stage"
TierSite Tier = "site"
TierEventTrigger Tier = "event_trigger"
)
// Header names for outgoing webhooks. The signature header name matches
// GitHub/Gitea/Forgejo so receivers built for those providers (and the
// service-to-notification-bridge generic webhook provider) verify out of the
// box. The X-Tinyforge-* headers are advisory and not covered by the HMAC.
const (
HeaderSignature = "X-Hub-Signature-256"
HeaderEvent = "X-Tinyforge-Event"
HeaderDelivery = "X-Tinyforge-Delivery"
HeaderTimestamp = "X-Tinyforge-Timestamp"
HeaderTier = "X-Tinyforge-Tier"
)
// userAgent is reported on every outgoing webhook request so operators can
// filter their access logs by source. Versioned tag is added later if/when
// we wire build-time variables; for now a static identifier is enough.
const userAgent = "Tinyforge-Webhook/1"
// TestResult is what /api/.../notification-test returns to the UI: the
// receiver's status code, latency, a short response preview, and whether a
// signature was sent (so the operator can tell at a glance if signing is
// configured for this tier).
type TestResult struct {
URL string `json:"url"`
Tier Tier `json:"tier"`
StatusCode int `json:"status_code"`
LatencyMs int64 `json:"latency_ms"`
SignatureSent bool `json:"signature_sent"`
DeliveryID string `json:"delivery_id"`
ResponseSnippet string `json:"response_snippet"`
Error string `json:"error,omitempty"`
}
// Notifier sends webhook notifications for deploy and site-sync events.
// Notifications are fire-and-forget by default — failures are logged but do
// not propagate. SendSyncForTest is the exception, used only by the manual
// test endpoint.
type Notifier struct {
httpClient *http.Client
wg sync.WaitGroup
}
// New creates a Notifier with sensible defaults.
func New() *Notifier {
return &Notifier{
httpClient: &http.Client{
Timeout: 10 * time.Second,
},
}
}
// Drain waits for all in-flight notifications to complete.
func (n *Notifier) Drain() {
n.wg.Wait()
}
// Send dispatches an unsigned event to the given URL in the background.
// Retained for callsites that don't yet have access to a signing secret;
// new code should prefer SendSigned which records the resolution tier.
func (n *Notifier) Send(webhookURL string, event Event) {
n.SendSigned(webhookURL, "", TierSettings, event)
}
// SendSigned dispatches an event, signing it with HMAC-SHA256 if secret is
// non-empty. The signature is computed over the exact JSON bytes sent on the
// wire (so receivers must verify the raw body, not a re-serialised copy).
//
// Empty secret => unsigned send (no X-Hub-Signature-256 header), preserving
// the legacy behaviour for receivers that pre-date HMAC support.
func (n *Notifier) SendSigned(webhookURL, secret string, tier Tier, event Event) {
if webhookURL == "" {
return
}
if event.Timestamp == "" {
event.Timestamp = time.Now().UTC().Format(time.RFC3339)
}
delivery := uuid.NewString()
n.wg.Add(1)
go func() {
defer n.wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err := n.doSend(ctx, webhookURL, secret, tier, delivery, event)
// URL host only — never log the secret or full URL with user-info.
host := safeHost(webhookURL)
if err != nil {
slog.Warn("notify: webhook send failed",
"tier", tier, "host", host, "delivery", delivery,
"event", event.Type, "signed", secret != "", "error", err)
return
}
slog.Info("notify: webhook dispatched",
"tier", tier, "host", host, "delivery", delivery,
"event", event.Type, "signed", secret != "")
}()
}
// SendPayload dispatches an arbitrary JSON payload to the given URL,
// signed with HMAC-SHA256 when secret is non-empty. Used by the
// event-trigger dispatcher: event-log → trigger filter → webhook
// delivery. The eventType travels in the X-Tinyforge-Event header so
// receivers can route by it without parsing the body.
//
// Fire-and-forget. Failures are logged at warn but never propagate;
// trigger reliability is observed via webhook_deliveries (audit trail)
// and the dispatcher remaining bus-driven means delivery hiccups
// cannot back-pressure event publishing.
func (n *Notifier) SendPayload(webhookURL, secret, eventType string, payload any) {
if webhookURL == "" {
return
}
delivery := uuid.NewString()
timestamp := time.Now().UTC().Format(time.RFC3339)
n.wg.Add(1)
go func() {
defer n.wg.Done()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
_, err := n.doSendRaw(ctx, webhookURL, secret, TierEventTrigger, delivery, eventType, timestamp, payload)
host := safeHost(webhookURL)
if err != nil {
slog.Warn("notify: trigger webhook send failed",
"tier", TierEventTrigger, "host", host, "delivery", delivery,
"event", eventType, "signed", secret != "", "error", err)
return
}
slog.Info("notify: trigger webhook dispatched",
"tier", TierEventTrigger, "host", host, "delivery", delivery,
"event", eventType, "signed", secret != "")
}()
}
// SendSyncForTest performs a synchronous, single-shot send for the "Send
// test" UI button. Returns a TestResult describing what the receiver
// answered with so the operator can confirm wiring without watching server
// logs. Errors are reported via the Error field rather than the returned
// error to keep the API ergonomic for the handler.
func (n *Notifier) SendSyncForTest(ctx context.Context, webhookURL, secret string, tier Tier, event Event) TestResult {
if event.Timestamp == "" {
event.Timestamp = time.Now().UTC().Format(time.RFC3339)
}
delivery := uuid.NewString()
result := TestResult{
URL: webhookURL,
Tier: tier,
SignatureSent: secret != "",
DeliveryID: delivery,
}
if webhookURL == "" {
result.Error = "no webhook URL configured for this tier"
return result
}
start := time.Now()
resp, err := n.doSend(ctx, webhookURL, secret, tier, delivery, event)
result.LatencyMs = time.Since(start).Milliseconds()
if err != nil {
result.Error = err.Error()
if resp != nil {
result.StatusCode = resp.StatusCode
result.ResponseSnippet = resp.BodyPreview
}
return result
}
result.StatusCode = resp.StatusCode
result.ResponseSnippet = resp.BodyPreview
return result
}
// SendSyncForTestPayload is the arbitrary-payload counterpart to
// SendSyncForTest. Returns the same TestResult shape but sends an
// arbitrary payload + event-type pair through the shared HTTP+HMAC
// core. Used by the event-trigger /test endpoint so the operator's
// receiver sees the same envelope shape it will receive during normal
// dispatch — verifying a different payload would defeat the test's
// purpose.
func (n *Notifier) SendSyncForTestPayload(ctx context.Context, webhookURL, secret string, tier Tier, eventType string, payload any) TestResult {
delivery := uuid.NewString()
timestamp := time.Now().UTC().Format(time.RFC3339)
result := TestResult{
URL: webhookURL,
Tier: tier,
SignatureSent: secret != "",
DeliveryID: delivery,
}
if webhookURL == "" {
result.Error = "no webhook URL configured for this tier"
return result
}
start := time.Now()
resp, err := n.doSendRaw(ctx, webhookURL, secret, tier, delivery, eventType, timestamp, payload)
result.LatencyMs = time.Since(start).Milliseconds()
if err != nil {
result.Error = err.Error()
if resp != nil {
result.StatusCode = resp.StatusCode
result.ResponseSnippet = resp.BodyPreview
}
return result
}
result.StatusCode = resp.StatusCode
result.ResponseSnippet = resp.BodyPreview
return result
}
// sendResponse captures the small subset of the receiver's response we want
// to surface back to the operator (status + a body preview). Distinct from
// http.Response so callers don't accidentally hold an unread body.
type sendResponse struct {
StatusCode int
BodyPreview string
}
// doSend performs the HTTP POST, signs the body if a secret is configured,
// and returns either a sendResponse (for the test path) or an error.
//
// The request body bytes are computed once so the HMAC signature matches
// exactly what travels on the wire. Receivers MUST verify against the raw
// body, not a re-serialised copy.
func (n *Notifier) doSend(ctx context.Context, webhookURL, secret string, tier Tier, delivery string, event Event) (*sendResponse, error) {
return n.doSendRaw(ctx, webhookURL, secret, tier, delivery, event.Type, event.Timestamp, event)
}
// doSendRaw is the shared HTTP+HMAC core. It serializes any payload to
// JSON, signs the resulting bytes (if a secret is configured) and
// dispatches with the same Tinyforge headers as the legacy deploy-event
// path. Separated out so SendPayload can reuse it without forcing the
// caller to fit into the Event shape.
func (n *Notifier) doSendRaw(ctx context.Context, webhookURL, secret string, tier Tier, delivery, eventType, timestamp string, payload any) (*sendResponse, error) {
body, err := json.Marshal(payload)
if err != nil {
return nil, fmt.Errorf("marshal notification: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, webhookURL, bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("create notification request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", userAgent)
req.Header.Set(HeaderEvent, eventType)
req.Header.Set(HeaderDelivery, delivery)
req.Header.Set(HeaderTimestamp, timestamp)
req.Header.Set(HeaderTier, string(tier))
if secret != "" {
req.Header.Set(HeaderSignature, "sha256="+sign(secret, body))
}
resp, err := n.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("send notification: %w", err)
}
defer resp.Body.Close()
preview, _ := io.ReadAll(io.LimitReader(resp.Body, 4096))
out := &sendResponse{
StatusCode: resp.StatusCode,
BodyPreview: string(preview),
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return out, fmt.Errorf("notification webhook returned status %d", resp.StatusCode)
}
return out, nil
}
// sign returns the lowercase-hex HMAC-SHA256 of body using secret as the
// key. The "sha256=" prefix is added by the caller to match GitHub's
// X-Hub-Signature-256 wire format.
func sign(secret string, body []byte) string {
mac := hmac.New(sha256.New, []byte(secret))
mac.Write(body)
return hex.EncodeToString(mac.Sum(nil))
}
// VerifySignature is the receiver-side counterpart to sign(). Exported so
// our own tests (and any future incoming-webhook receiver in this repo) can
// re-use the exact construction without duplicating the HMAC code.
//
// signatureHeader accepts either the raw hex digest or the GitHub-style
// "sha256=<hex>" envelope.
func VerifySignature(secret string, body []byte, signatureHeader string) bool {
if secret == "" || signatureHeader == "" {
return false
}
got := signatureHeader
if len(got) > 7 && got[:7] == "sha256=" {
got = got[7:]
}
want := sign(secret, body)
// hmac.Equal is the constant-time comparator; bytes.Equal would leak
// timing information about the first differing byte.
return hmac.Equal([]byte(got), []byte(want))
}
// safeHost extracts the host (and optional port) from a webhook URL for
// logging. Returns the input unchanged if parsing fails so we never silently
// swallow a malformed URL — operators see the failure mode either way.
func safeHost(raw string) string {
u, err := url.Parse(raw)
if err != nil || u.Host == "" {
return "(unparseable)"
}
return u.Host
}