fix: harden security, fix concurrency bugs, and address review findings
Build / build (push) Successful in 11m42s
Build / build (push) Successful in 11m42s
Security: - rate limit /api/webhook routes per-IP and cap concurrent site syncs - global SSE connection cap (256) with new sse_gate - validate ?tail= and cap JSON log responses at 4 MiB - strip ANSI/CSI/OSC and control bytes from streamed log lines - redact webhook secret from request log middleware - scrub host details from /api/health for non-admin viewers - drop container_id from /api/system/stats/top for non-admins - generate webhook secrets via crypto/rand; require >=32 chars on insert - verify iid path consistency in streamContainerLogs - LimitReader on site webhook body; reject malformed non-empty bodies Concurrency / correctness: - stats collector: Stop() no longer hangs without Start(), semaphore acquired in parent loop so ctx cancellation short-circuits the queue, in-flight tick cancellable via shared base context, zero-ts guard - webhook handler: replace fire-and-forget goroutine with WaitGroup-tracked workers + Drain() wired into graceful shutdown - $derived(() => ...) mis-idiom fixed in ContainerStats / InstanceCard / ProjectCard (returned function instead of value) - SystemResourcesCard: rename `window` and `t` locals to avoid shadowing globalThis.window and the i18n `t` import Quality / performance: - replace O(n^2) insertion sort with sort.Slice in stats top - runMigrations only swallows duplicate-column / already-exists errors - PruneStatsSamplesBefore wrapped in a transaction - collapse N+1 in unusedImageStats / pruneImages to one ListAllInstances pass; surface DB errors instead of silently treating them as inactive - run Docker Info + DiskUsage in parallel via errgroup - container log SSE emits `: ping` heartbeat every 20 s - imageMatches case-insensitive on registry host (RFC behaviour) - log warning on invalid stage tag pattern instead of silent skip - reject malformed non-empty site webhook payloads Frontend / i18n: - shared formatBytes utility replaces three local copies - statsInterval store drives dynamic "no samples / collection disabled" copy across ContainerStats and SystemResourcesCard - top consumers row now shows owner_name (project/stage or site name) - drop seven `as any` casts on the Settings type; add cloudflare_api_token write-only field - move "Service status", "Docker daemon", "Docker unreachable", "Proxy unreachable", "reachable", and "Docker daemon is not reachable." strings into en/ru i18n bundles
This commit is contained in:
+138
-32
@@ -5,15 +5,38 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"github.com/alexei/tinyforge/internal/store"
|
||||
)
|
||||
|
||||
// Limits and constants for the log endpoints.
|
||||
const (
|
||||
defaultLogTail = 200
|
||||
maxLogTail = 5000
|
||||
maxJSONLogBytes = 4 << 20 // 4 MiB cap for non-streaming log responses
|
||||
maxLogLineBytes = 1 << 20 // 1 MiB max line length for the bufio.Scanner
|
||||
logHeartbeatPeriod = 20 * time.Second
|
||||
)
|
||||
|
||||
// ANSI escape sequence patterns. Stripped from streamed log lines so a
|
||||
// hostile container cannot inject terminal control sequences (cursor moves,
|
||||
// hyperlink escapes, screen clears) into operator displays or pasted output.
|
||||
var (
|
||||
ansiCSIPattern = regexp.MustCompile(`\x1b\[[0-9;?]*[ -/]*[@-~]`)
|
||||
ansiOSCPattern = regexp.MustCompile(`\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)`)
|
||||
ctlBytePattern = regexp.MustCompile(`[\x00-\x08\x0b-\x1a\x1c-\x1f\x7f]`)
|
||||
)
|
||||
|
||||
// listProjectImages handles GET /api/projects/{id}/images.
|
||||
// Returns all local Docker images matching the project's image reference.
|
||||
func (s *Server) listProjectImages(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -50,6 +73,8 @@ func (s *Server) listProjectImages(w http.ResponseWriter, r *http.Request) {
|
||||
// - tail: number of lines from end (default "200")
|
||||
// - follow: "true" to stream new lines in real-time
|
||||
func (s *Server) streamContainerLogs(w http.ResponseWriter, r *http.Request) {
|
||||
projectID := chi.URLParam(r, "id")
|
||||
stageID := chi.URLParam(r, "stage")
|
||||
instanceID := chi.URLParam(r, "iid")
|
||||
|
||||
inst, err := s.store.GetInstanceByID(instanceID)
|
||||
@@ -63,6 +88,14 @@ func (s *Server) streamContainerLogs(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Verify the instance actually belongs to the project/stage in the path.
|
||||
// Without this, a user could stream logs for any instance ID by guessing
|
||||
// it under the wrong project — defence-in-depth for future per-project ACLs.
|
||||
if inst.ProjectID != projectID || inst.StageID != stageID {
|
||||
respondNotFound(w, "instance")
|
||||
return
|
||||
}
|
||||
|
||||
if inst.ContainerID == "" {
|
||||
respondError(w, http.StatusBadRequest, "instance has no container")
|
||||
return
|
||||
@@ -80,10 +113,7 @@ func (s *Server) streamLogsForContainer(w http.ResponseWriter, r *http.Request,
|
||||
return
|
||||
}
|
||||
|
||||
tail := r.URL.Query().Get("tail")
|
||||
if tail == "" {
|
||||
tail = "200"
|
||||
}
|
||||
tail := parseTailParam(r.URL.Query().Get("tail"))
|
||||
follow := r.URL.Query().Get("follow") == "true"
|
||||
|
||||
// Check if client accepts SSE.
|
||||
@@ -99,8 +129,10 @@ func (s *Server) streamLogsForContainer(w http.ResponseWriter, r *http.Request,
|
||||
defer logReader.Close()
|
||||
|
||||
if !isSSE {
|
||||
// JSON mode: read all lines and return as array.
|
||||
scanner := bufio.NewScanner(logReader)
|
||||
// JSON mode: cap the total bytes read so a chatty container with
|
||||
// tail=large cannot exhaust server memory.
|
||||
scanner := bufio.NewScanner(io.LimitReader(logReader, maxJSONLogBytes))
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), maxLogLineBytes)
|
||||
var lines []string
|
||||
for scanner.Scan() {
|
||||
line := sanitizeDockerLogLine(scanner.Text())
|
||||
@@ -116,6 +148,12 @@ func (s *Server) streamLogsForContainer(w http.ResponseWriter, r *http.Request,
|
||||
}
|
||||
|
||||
// SSE mode: stream lines as they arrive.
|
||||
release, ok := acquireSSESlot(w, s.sseGate)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
defer release()
|
||||
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
respondError(w, http.StatusInternalServerError, "streaming not supported")
|
||||
@@ -126,7 +164,31 @@ func (s *Server) streamLogsForContainer(w http.ResponseWriter, r *http.Request,
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
|
||||
// Heartbeat keeps the connection warm through proxies that close idle
|
||||
// streams. Sent as an SSE comment which the EventSource API ignores.
|
||||
heartbeat := time.NewTicker(logHeartbeatPeriod)
|
||||
defer heartbeat.Stop()
|
||||
heartbeatDone := make(chan struct{})
|
||||
defer close(heartbeatDone)
|
||||
var hbMu sync.Mutex
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-heartbeat.C:
|
||||
hbMu.Lock()
|
||||
_, _ = io.WriteString(w, ": ping\n\n")
|
||||
flusher.Flush()
|
||||
hbMu.Unlock()
|
||||
case <-heartbeatDone:
|
||||
return
|
||||
case <-r.Context().Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
scanner := bufio.NewScanner(logReader)
|
||||
scanner.Buffer(make([]byte, 0, 64*1024), maxLogLineBytes)
|
||||
for scanner.Scan() {
|
||||
line := sanitizeDockerLogLine(scanner.Text())
|
||||
if line == "" {
|
||||
@@ -134,8 +196,10 @@ func (s *Server) streamLogsForContainer(w http.ResponseWriter, r *http.Request,
|
||||
}
|
||||
|
||||
data, _ := json.Marshal(map[string]string{"line": line})
|
||||
hbMu.Lock()
|
||||
fmt.Fprintf(w, "data: %s\n\n", data)
|
||||
flusher.Flush()
|
||||
hbMu.Unlock()
|
||||
|
||||
// Check if client disconnected.
|
||||
select {
|
||||
@@ -146,17 +210,67 @@ func (s *Server) streamLogsForContainer(w http.ResponseWriter, r *http.Request,
|
||||
}
|
||||
}
|
||||
|
||||
// parseTailParam validates and clamps the ?tail= query value. Empty/invalid
|
||||
// inputs fall back to the default; values above the cap are clamped down.
|
||||
// "all" is rejected — letting the caller request unbounded log history is a
|
||||
// trivial DoS vector.
|
||||
func parseTailParam(raw string) string {
|
||||
if raw == "" {
|
||||
return strconv.Itoa(defaultLogTail)
|
||||
}
|
||||
n, err := strconv.Atoi(raw)
|
||||
if err != nil || n <= 0 {
|
||||
return strconv.Itoa(defaultLogTail)
|
||||
}
|
||||
if n > maxLogTail {
|
||||
n = maxLogTail
|
||||
}
|
||||
return strconv.Itoa(n)
|
||||
}
|
||||
|
||||
// sanitizeDockerLogLine strips the Docker log stream header (8-byte prefix)
|
||||
// that Docker adds to non-TTY container logs.
|
||||
// that Docker adds to non-TTY container logs, and removes terminal control
|
||||
// sequences so a hostile container cannot inject ANSI escapes that hijack an
|
||||
// operator's terminal when log output is pasted or rendered raw.
|
||||
func sanitizeDockerLogLine(line string) string {
|
||||
// Docker multiplexed stream: first 8 bytes are header (stream type + size).
|
||||
// If the line starts with a non-printable byte followed by 0x00 0x00 0x00, strip 8 bytes.
|
||||
if len(line) > 8 && (line[0] == 1 || line[0] == 2) && line[1] == 0 && line[2] == 0 && line[3] == 0 {
|
||||
return line[8:]
|
||||
line = line[8:]
|
||||
}
|
||||
line = ansiOSCPattern.ReplaceAllString(line, "")
|
||||
line = ansiCSIPattern.ReplaceAllString(line, "")
|
||||
line = ctlBytePattern.ReplaceAllString(line, "")
|
||||
return line
|
||||
}
|
||||
|
||||
// buildActiveImagesSet returns the set of "image:tag" strings currently used
|
||||
// by any instance, computed in a single DB pass instead of N×K queries.
|
||||
// Returning an error (rather than swallowing) prevents prune logic from
|
||||
// treating a transient DB failure as "nothing is active".
|
||||
func buildActiveImagesSet(st *store.Store, projects []store.Project) (map[string]bool, error) {
|
||||
imageByProject := make(map[string]string, len(projects))
|
||||
for _, p := range projects {
|
||||
imageByProject[p.ID] = p.Image
|
||||
}
|
||||
instances, err := st.ListAllInstances()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("list instances: %w", err)
|
||||
}
|
||||
active := make(map[string]bool, len(instances))
|
||||
for _, inst := range instances {
|
||||
if inst.ImageTag == "" {
|
||||
continue
|
||||
}
|
||||
image := imageByProject[inst.ProjectID]
|
||||
if image == "" {
|
||||
continue
|
||||
}
|
||||
active[image+":"+inst.ImageTag] = true
|
||||
}
|
||||
return active, nil
|
||||
}
|
||||
|
||||
// unusedImageStats handles GET /api/docker/unused-images.
|
||||
// Returns the total size of unused project images and whether the threshold is exceeded.
|
||||
func (s *Server) unusedImageStats(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -181,18 +295,14 @@ func (s *Server) unusedImageStats(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Build set of active image refs.
|
||||
activeImages := make(map[string]bool)
|
||||
for _, p := range projects {
|
||||
stages, _ := s.store.GetStagesByProjectID(p.ID)
|
||||
for _, st := range stages {
|
||||
instances, _ := s.store.GetInstancesByStageID(st.ID)
|
||||
for _, inst := range instances {
|
||||
if inst.ImageTag != "" {
|
||||
activeImages[p.Image+":"+inst.ImageTag] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
// Build set of active image refs in one DB pass instead of N×K queries.
|
||||
// A flaky read here previously masqueraded as "no images are active",
|
||||
// which on the prune endpoint would have deleted *running* images.
|
||||
activeImages, err := buildActiveImagesSet(s.store, projects)
|
||||
if err != nil {
|
||||
slog.Error("unused images: build active set", "error", err)
|
||||
respondError(w, http.StatusInternalServerError, "internal server error")
|
||||
return
|
||||
}
|
||||
|
||||
// Sum unused image sizes.
|
||||
@@ -242,18 +352,14 @@ func (s *Server) pruneImages(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Build a set of image refs used by active instances.
|
||||
activeImages := make(map[string]bool)
|
||||
for _, p := range projects {
|
||||
stages, _ := s.store.GetStagesByProjectID(p.ID)
|
||||
for _, st := range stages {
|
||||
instances, _ := s.store.GetInstancesByStageID(st.ID)
|
||||
for _, inst := range instances {
|
||||
if inst.ImageTag != "" {
|
||||
activeImages[p.Image+":"+inst.ImageTag] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
// Build a set of image refs used by active instances. Bail out on error
|
||||
// — silently treating a DB blip as "no active images" would prune
|
||||
// images currently in use by running containers.
|
||||
activeImages, err := buildActiveImagesSet(s.store, projects)
|
||||
if err != nil {
|
||||
slog.Error("prune: build active set", "error", err)
|
||||
respondError(w, http.StatusInternalServerError, "internal server error")
|
||||
return
|
||||
}
|
||||
|
||||
// Collect all unique image bases from projects (without tags).
|
||||
|
||||
+64
-8
@@ -5,20 +5,57 @@ import (
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/alexei/tinyforge/internal/auth"
|
||||
"github.com/alexei/tinyforge/internal/proxy"
|
||||
)
|
||||
|
||||
// healthProbeTimeout caps a single health probe so a stuck dependency does
|
||||
// not hold the polling endpoint open. The UI polls every 30 s, so 8 s leaves
|
||||
// headroom for the ping + Info + NPM list calls.
|
||||
const healthProbeTimeout = 8 * time.Second
|
||||
|
||||
// nonAdminDockerFields enumerates the fields any authenticated user is
|
||||
// allowed to see — version + connectivity + container counts. Host-detail
|
||||
// fields (kernel, root_dir, hostname, OS, storage driver) are admin-only to
|
||||
// avoid recon information leaks.
|
||||
var nonAdminDockerFields = map[string]bool{
|
||||
"connected": true,
|
||||
"latency_ms": true,
|
||||
"error": true,
|
||||
"version": true,
|
||||
"api_version": true,
|
||||
"containers": true,
|
||||
"running": true,
|
||||
"paused": true,
|
||||
"stopped": true,
|
||||
"images": true,
|
||||
"ncpu": true,
|
||||
"memory_total": true,
|
||||
}
|
||||
|
||||
// nonAdminProxyFields are the proxy fields safe to share with non-admins.
|
||||
// Configured URLs and aggregate counts of internal lists/certs are stripped.
|
||||
var nonAdminProxyFields = map[string]bool{
|
||||
"provider": true,
|
||||
"connected": true,
|
||||
"latency_ms": true,
|
||||
"error": true,
|
||||
"proxy_hosts_managed": true,
|
||||
}
|
||||
|
||||
// getHealth handles GET /api/health.
|
||||
//
|
||||
// Returns the connectivity state and (when connected) rich diagnostics for the
|
||||
// Docker daemon and the active proxy provider. This endpoint is polled by the
|
||||
// UI every 30 seconds — keep the calls cheap. The expensive NPM list calls
|
||||
// are only issued when the initial ping succeeds, so a down proxy never
|
||||
// amplifies latency.
|
||||
// Returns the connectivity state and (when connected) diagnostics for the
|
||||
// Docker daemon and the active proxy provider. Detailed host information
|
||||
// (kernel, root_dir, internal NPM URL, …) is stripped for non-admin users to
|
||||
// avoid leaking infrastructure details to read-only viewers.
|
||||
func (s *Server) getHealth(w http.ResponseWriter, r *http.Request) {
|
||||
ctx, cancel := context.WithTimeout(r.Context(), 8*time.Second)
|
||||
ctx, cancel := context.WithTimeout(r.Context(), healthProbeTimeout)
|
||||
defer cancel()
|
||||
|
||||
claims, _ := auth.ClaimsFromContext(r.Context())
|
||||
isAdmin := claims.Role == "admin"
|
||||
|
||||
now := time.Now().UTC().Format(time.RFC3339)
|
||||
result := map[string]any{
|
||||
"checked_at": now,
|
||||
@@ -32,16 +69,35 @@ func (s *Server) getHealth(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// ── Docker daemon ────────────────────────────────────────────────
|
||||
result["docker"] = s.dockerHealth(ctx)
|
||||
docker := s.dockerHealth(ctx)
|
||||
if !isAdmin {
|
||||
docker = filterFields(docker, nonAdminDockerFields)
|
||||
}
|
||||
result["docker"] = docker
|
||||
|
||||
// ── Proxy provider ───────────────────────────────────────────────
|
||||
if s.proxyProvider != nil {
|
||||
result["proxy"] = s.proxyHealth(ctx)
|
||||
proxyInfo := s.proxyHealth(ctx)
|
||||
if !isAdmin {
|
||||
proxyInfo = filterFields(proxyInfo, nonAdminProxyFields)
|
||||
}
|
||||
result["proxy"] = proxyInfo
|
||||
}
|
||||
|
||||
respondJSON(w, http.StatusOK, result)
|
||||
}
|
||||
|
||||
// filterFields returns a copy of m containing only the keys present in allow.
|
||||
func filterFields(m map[string]any, allow map[string]bool) map[string]any {
|
||||
out := make(map[string]any, len(allow))
|
||||
for k, v := range m {
|
||||
if allow[k] {
|
||||
out[k] = v
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// dockerHealth probes the Docker daemon and, if reachable, attaches a full
|
||||
// DaemonInfo snapshot. The caller does not need to error-check the Info()
|
||||
// call — if it fails, the connected flag remains true (ping succeeded) but
|
||||
|
||||
@@ -4,12 +4,15 @@ import (
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"runtime/debug"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// logging is an HTTP middleware that logs every request with method, path,
|
||||
// status code, and duration.
|
||||
// status code, and duration. Webhook URLs are redacted before being logged
|
||||
// because the secret is the only authenticator — leaking it to log
|
||||
// aggregators is equivalent to leaking the credential.
|
||||
func logging(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
start := time.Now()
|
||||
@@ -19,13 +22,26 @@ func logging(next http.Handler) http.Handler {
|
||||
|
||||
slog.Info("http request",
|
||||
"method", r.Method,
|
||||
"path", r.URL.Path,
|
||||
"path", redactPath(r.URL.Path),
|
||||
"status", wrapped.status,
|
||||
"duration", time.Since(start).String(),
|
||||
)
|
||||
})
|
||||
}
|
||||
|
||||
// redactPath strips secrets from URL paths that carry them in segments.
|
||||
func redactPath(path string) string {
|
||||
const projectPrefix = "/api/webhook/"
|
||||
const sitePrefix = "/api/webhook/sites/"
|
||||
switch {
|
||||
case strings.HasPrefix(path, sitePrefix):
|
||||
return sitePrefix + "***"
|
||||
case strings.HasPrefix(path, projectPrefix):
|
||||
return projectPrefix + "***"
|
||||
}
|
||||
return path
|
||||
}
|
||||
|
||||
// recovery is an HTTP middleware that catches panics and returns a 500 response.
|
||||
func recovery(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
@@ -145,6 +161,24 @@ func jsonContentType(next http.Handler) http.Handler {
|
||||
})
|
||||
}
|
||||
|
||||
// rateLimitMiddleware wraps a handler with per-IP rate limiting using the
|
||||
// supplied limiter. Requests over the limit get 429.
|
||||
func rateLimitMiddleware(rl *rateLimiter) func(http.Handler) http.Handler {
|
||||
return func(next http.Handler) http.Handler {
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
ip := r.RemoteAddr
|
||||
if fwd := r.Header.Get("X-Forwarded-For"); fwd != "" {
|
||||
ip = fwd
|
||||
}
|
||||
if !rl.allow(ip) {
|
||||
respondError(w, http.StatusTooManyRequests, "rate limit exceeded")
|
||||
return
|
||||
}
|
||||
next.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// statusRecorder wraps http.ResponseWriter to capture the status code.
|
||||
type statusRecorder struct {
|
||||
http.ResponseWriter
|
||||
|
||||
@@ -47,6 +47,7 @@ type Server struct {
|
||||
staticSiteManager *staticsite.Manager
|
||||
stackManager *stack.Manager
|
||||
backupEngine *backup.Engine
|
||||
sseGate *sseGate
|
||||
dbPath string
|
||||
shutdownFunc func() // called after restore to trigger graceful shutdown
|
||||
onBackupSettingsChanged func(enabled bool, intervalHours int) // called when backup settings change
|
||||
@@ -76,6 +77,7 @@ func NewServer(
|
||||
eventBus: eventBus,
|
||||
encKey: encKey,
|
||||
localAuth: localAuth,
|
||||
sseGate: newSSEGate(maxConcurrentSSEStreams),
|
||||
}
|
||||
|
||||
// Try to initialize OIDC provider from stored settings.
|
||||
@@ -187,6 +189,7 @@ func (s *Server) Router() chi.Router {
|
||||
r.Use(cors)
|
||||
|
||||
loginLimiter := newRateLimiter()
|
||||
webhookLimiter := newRateLimiter()
|
||||
|
||||
r.Route("/api", func(r chi.Router) {
|
||||
// JSON content type and body size limit for API routes.
|
||||
@@ -201,7 +204,10 @@ func (s *Server) Router() chi.Router {
|
||||
r.Post("/auth/oidc/token", s.oidcExchangeToken)
|
||||
|
||||
// Webhook handler (uses its own secret-based auth).
|
||||
r.Mount("/webhook", s.webhook.Route())
|
||||
// Per-IP rate limit prevents an attacker who has guessed (or leaked)
|
||||
// a secret from triggering a deploy storm, and rejects unauthenticated
|
||||
// brute-force probes over the secret URL space.
|
||||
r.With(rateLimitMiddleware(webhookLimiter)).Mount("/webhook", s.webhook.Route())
|
||||
|
||||
// Protected routes: require valid JWT.
|
||||
r.Group(func(r chi.Router) {
|
||||
@@ -340,7 +346,7 @@ func (s *Server) Router() chi.Router {
|
||||
// System resources (read-only).
|
||||
r.Get("/system/stats", s.getSystemStats)
|
||||
r.Get("/system/stats/history", s.getSystemStatsHistory)
|
||||
r.Get("/system/stats/top", s.listTopContainersByCPU)
|
||||
r.Get("/system/stats/top", s.listTopContainers)
|
||||
|
||||
// Admin-only routes: require admin role.
|
||||
r.Group(func(r chi.Router) {
|
||||
|
||||
@@ -55,6 +55,12 @@ func (s *Server) streamDeployLogs(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// SSE mode.
|
||||
release, ok := acquireSSESlot(w, s.sseGate)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
defer release()
|
||||
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
respondError(w, http.StatusInternalServerError, "streaming not supported")
|
||||
@@ -140,6 +146,12 @@ func (s *Server) streamDeployLogs(w http.ResponseWriter, r *http.Request) {
|
||||
// streamEvents handles GET /api/events.
|
||||
// It streams instance status changes and deploy status changes via SSE.
|
||||
func (s *Server) streamEvents(w http.ResponseWriter, r *http.Request) {
|
||||
release, ok := acquireSSESlot(w, s.sseGate)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
defer release()
|
||||
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
respondError(w, http.StatusInternalServerError, "streaming not supported")
|
||||
|
||||
@@ -0,0 +1,40 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"net/http"
|
||||
"sync/atomic"
|
||||
)
|
||||
|
||||
// maxConcurrentSSEStreams caps the global number of in-flight SSE
|
||||
// connections. Each stream holds a goroutine, an event-bus subscription, and
|
||||
// (for log streams) a Docker daemon TCP socket; a single tab opening
|
||||
// thousands of EventSources would otherwise exhaust file descriptors.
|
||||
const maxConcurrentSSEStreams = 256
|
||||
|
||||
// sseGate is a counting gate that limits concurrent SSE streams.
|
||||
type sseGate struct {
|
||||
cap int64
|
||||
cur atomic.Int64
|
||||
}
|
||||
|
||||
func newSSEGate(cap int) *sseGate { return &sseGate{cap: int64(cap)} }
|
||||
|
||||
// enter reserves a slot and returns a release func, or nil if the gate is full.
|
||||
func (g *sseGate) enter() func() {
|
||||
if g.cur.Add(1) > g.cap {
|
||||
g.cur.Add(-1)
|
||||
return nil
|
||||
}
|
||||
return func() { g.cur.Add(-1) }
|
||||
}
|
||||
|
||||
// acquireSSESlot is a small helper used by every SSE handler to honour the
|
||||
// global cap. Returns false (and writes a 503) if the cap is reached.
|
||||
func acquireSSESlot(w http.ResponseWriter, gate *sseGate) (release func(), ok bool) {
|
||||
release = gate.enter()
|
||||
if release == nil {
|
||||
respondError(w, http.StatusServiceUnavailable, "stream limit reached")
|
||||
return nil, false
|
||||
}
|
||||
return release, true
|
||||
}
|
||||
@@ -4,15 +4,30 @@ import (
|
||||
"errors"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"sort"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"github.com/alexei/tinyforge/internal/auth"
|
||||
"github.com/alexei/tinyforge/internal/stats"
|
||||
"github.com/alexei/tinyforge/internal/store"
|
||||
)
|
||||
|
||||
// topConsumerWindow is how recent a container sample must be to count toward
|
||||
// the "top consumers" list. Scaled with the collector interval (read from
|
||||
// settings) so it stays meaningful even when sampling is sparse.
|
||||
const topConsumerMinWindow = 2 * time.Minute
|
||||
|
||||
// TopContainerSample augments a stats sample with the human-readable owner
|
||||
// name so the UI can show "project/stage" or the static-site name without an
|
||||
// extra round-trip per row.
|
||||
type TopContainerSample struct {
|
||||
store.ContainerStatsSample
|
||||
OwnerName string `json:"owner_name"`
|
||||
}
|
||||
|
||||
const (
|
||||
// defaultHistoryWindow is used when no ?window= param is provided or the
|
||||
// value fails to parse. Matches the default retention so the "last 2h"
|
||||
@@ -175,11 +190,11 @@ func (s *Server) streamStaticSiteLogs(w http.ResponseWriter, r *http.Request) {
|
||||
s.streamLogsForContainer(w, r, site.ContainerID)
|
||||
}
|
||||
|
||||
// listTopContainersByCPU handles GET /api/system/stats/top?limit=5&by=cpu.
|
||||
// listTopContainers handles GET /api/system/stats/top?limit=5&by=cpu.
|
||||
// Returns the top-N most recent samples across containers, sorted by CPU or
|
||||
// memory. Useful for a system dashboard "top consumers" widget without
|
||||
// requiring the frontend to aggregate per-container history on its own.
|
||||
func (s *Server) listTopContainersByCPU(w http.ResponseWriter, r *http.Request) {
|
||||
// memory. Container IDs are stripped for non-admins so a low-privilege viewer
|
||||
// cannot enumerate workloads outside their scope.
|
||||
func (s *Server) listTopContainers(w http.ResponseWriter, r *http.Request) {
|
||||
limit := 5
|
||||
if raw := r.URL.Query().Get("limit"); raw != "" {
|
||||
if n, err := strconv.Atoi(raw); err == nil && n > 0 && n <= 50 {
|
||||
@@ -191,9 +206,16 @@ func (s *Server) listTopContainersByCPU(w http.ResponseWriter, r *http.Request)
|
||||
by = "cpu"
|
||||
}
|
||||
|
||||
// Samples from the last 2 minutes window so "top" reflects near-current
|
||||
// load, not long-dead rows.
|
||||
samples, err := s.store.ListAllRecentContainerStatsSamples(sinceTimestamp(2 * time.Minute))
|
||||
// Samples must be at least as recent as max(2*interval, 2 minutes) so the
|
||||
// list reflects near-current load even when collection is sparse.
|
||||
window := topConsumerMinWindow
|
||||
if settings, err := s.store.GetSettings(); err == nil && settings.StatsIntervalSeconds > 0 {
|
||||
if w := time.Duration(settings.StatsIntervalSeconds*2) * time.Second; w > window {
|
||||
window = w
|
||||
}
|
||||
}
|
||||
|
||||
samples, err := s.store.ListAllRecentContainerStatsSamples(sinceTimestamp(window))
|
||||
if err != nil {
|
||||
slog.Error("failed to list container samples for top", "error", err)
|
||||
respondError(w, http.StatusInternalServerError, "failed to list samples")
|
||||
@@ -213,33 +235,75 @@ func (s *Server) listTopContainersByCPU(w http.ResponseWriter, r *http.Request)
|
||||
top = append(top, sm)
|
||||
}
|
||||
|
||||
// Partial-sort by the requested metric, descending. For small N a simple
|
||||
// insertion-like approach is plenty.
|
||||
sortContainerSamples(top, by)
|
||||
sort.Slice(top, func(i, j int) bool {
|
||||
if by == "memory" {
|
||||
return top[i].MemoryUsage > top[j].MemoryUsage
|
||||
}
|
||||
return top[i].CPUPercent > top[j].CPUPercent
|
||||
})
|
||||
if len(top) > limit {
|
||||
top = top[:limit]
|
||||
}
|
||||
respondJSON(w, http.StatusOK, top)
|
||||
}
|
||||
|
||||
// sortContainerSamples sorts in place by CPU (or memory) descending.
|
||||
// Note: ListContainerStatsSamples with empty ownerID returns no rows — the
|
||||
// caller uses per-owner-type queries and merges; this helper is applied to
|
||||
// the already-merged slice.
|
||||
func sortContainerSamples(s []store.ContainerStatsSample, by string) {
|
||||
// O(n^2) is fine — N is small (bounded by the number of containers).
|
||||
for i := 1; i < len(s); i++ {
|
||||
for j := i; j > 0; j-- {
|
||||
var less bool
|
||||
if by == "memory" {
|
||||
less = s[j].MemoryUsage > s[j-1].MemoryUsage
|
||||
} else {
|
||||
less = s[j].CPUPercent > s[j-1].CPUPercent
|
||||
}
|
||||
if !less {
|
||||
break
|
||||
}
|
||||
s[j-1], s[j] = s[j], s[j-1]
|
||||
// Resolve owner names so the UI can show "project/stage" or the site name
|
||||
// without a per-row round trip.
|
||||
enriched := s.enrichWithOwnerNames(top)
|
||||
|
||||
// Scrub container IDs for non-admins. The owner name is the actionable
|
||||
// identifier; the container ID is a host-level handle that reveals
|
||||
// workload existence to viewers who shouldn't have it.
|
||||
claims, _ := auth.ClaimsFromContext(r.Context())
|
||||
if claims.Role != "admin" {
|
||||
for i := range enriched {
|
||||
enriched[i].ContainerID = ""
|
||||
}
|
||||
}
|
||||
|
||||
respondJSON(w, http.StatusOK, enriched)
|
||||
}
|
||||
|
||||
// enrichWithOwnerNames attaches a human-readable owner name to each sample.
|
||||
// Looks up instances and sites in batch so the cost is independent of the
|
||||
// number of samples (which is at most 'limit').
|
||||
func (s *Server) enrichWithOwnerNames(samples []store.ContainerStatsSample) []TopContainerSample {
|
||||
out := make([]TopContainerSample, len(samples))
|
||||
for i, sm := range samples {
|
||||
out[i] = TopContainerSample{ContainerStatsSample: sm}
|
||||
switch sm.OwnerType {
|
||||
case stats.OwnerTypeInstance:
|
||||
out[i].OwnerName = s.lookupInstanceName(sm.OwnerID)
|
||||
case stats.OwnerTypeSite:
|
||||
out[i].OwnerName = s.lookupSiteName(sm.OwnerID)
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// lookupInstanceName returns "project/stage" for an instance, or empty on
|
||||
// any lookup error so a transient miss does not break the response.
|
||||
func (s *Server) lookupInstanceName(instanceID string) string {
|
||||
inst, err := s.store.GetInstanceByID(instanceID)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
project, perr := s.store.GetProjectByID(inst.ProjectID)
|
||||
stage, serr := s.store.GetStageByID(inst.StageID)
|
||||
switch {
|
||||
case perr == nil && serr == nil:
|
||||
return project.Name + "/" + stage.Name
|
||||
case perr == nil:
|
||||
return project.Name
|
||||
case serr == nil:
|
||||
return stage.Name
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// lookupSiteName returns the site's display name or empty on lookup error.
|
||||
func (s *Server) lookupSiteName(siteID string) string {
|
||||
site, err := s.store.GetStaticSiteByID(siteID)
|
||||
if err != nil {
|
||||
return ""
|
||||
}
|
||||
return site.Name
|
||||
}
|
||||
|
||||
@@ -1,16 +1,28 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
"github.com/google/uuid"
|
||||
|
||||
"github.com/alexei/tinyforge/internal/store"
|
||||
)
|
||||
|
||||
// generateWebhookSecret returns a 256-bit hex-encoded random token. Mirrors
|
||||
// the helper in internal/store; kept here to avoid an import cycle and so the
|
||||
// rotation handlers don't pretend to use uuid for what is really a secret.
|
||||
func generateWebhookSecret() string {
|
||||
b := make([]byte, 32)
|
||||
if _, err := rand.Read(b); err != nil {
|
||||
panic("crypto/rand failed: " + err.Error())
|
||||
}
|
||||
return hex.EncodeToString(b)
|
||||
}
|
||||
|
||||
// webhookURLResponse is the common payload returned by every webhook endpoint.
|
||||
// Clients never see raw secrets except at issue/rotate time via these fields;
|
||||
// the URL shape is "/api/webhook/..." so callers can prepend their own origin.
|
||||
@@ -58,7 +70,7 @@ func (s *Server) regenerateProjectWebhook(w http.ResponseWriter, r *http.Request
|
||||
return
|
||||
}
|
||||
|
||||
secret := uuid.New().String()
|
||||
secret := generateWebhookSecret()
|
||||
if err := s.store.SetProjectWebhookSecret(id, secret); err != nil {
|
||||
slog.Error("regenerate project webhook: set secret", "project", id, "error", err)
|
||||
respondError(w, http.StatusInternalServerError, "failed to rotate webhook secret")
|
||||
@@ -107,7 +119,7 @@ func (s *Server) regenerateStaticSiteWebhook(w http.ResponseWriter, r *http.Requ
|
||||
return
|
||||
}
|
||||
|
||||
secret := uuid.New().String()
|
||||
secret := generateWebhookSecret()
|
||||
if err := s.store.SetStaticSiteWebhookSecret(id, secret); err != nil {
|
||||
slog.Error("regenerate site webhook: set secret", "site", id, "error", err)
|
||||
respondError(w, http.StatusInternalServerError, "failed to rotate webhook secret")
|
||||
|
||||
+46
-23
@@ -3,9 +3,11 @@ package docker
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"time"
|
||||
|
||||
"github.com/moby/moby/client"
|
||||
"golang.org/x/sync/errgroup"
|
||||
)
|
||||
|
||||
// SystemStats is a host-level snapshot combining daemon capacity
|
||||
@@ -42,33 +44,54 @@ type SystemStats struct {
|
||||
DiskTotalBytes int64 `json:"disk_total_bytes"`
|
||||
}
|
||||
|
||||
// GetSystemStats returns a one-shot host-level snapshot. The Info() call
|
||||
// and disk usage call are made in sequence. Disk usage failures do not
|
||||
// fail the whole call — the result degrades gracefully with zero disk fields.
|
||||
// GetSystemStats returns a one-shot host-level snapshot. Info and DiskUsage
|
||||
// are issued in parallel because DiskUsage walks every layer/volume and is
|
||||
// often the slowest call on a busy host (1-3 s); Info typically completes in
|
||||
// ~10 ms. Disk usage failures do not fail the whole call — the result
|
||||
// degrades gracefully with zero disk fields and a warning log.
|
||||
func (c *Client) GetSystemStats(ctx context.Context) (SystemStats, error) {
|
||||
info, err := c.Info(ctx)
|
||||
if err != nil {
|
||||
return SystemStats{}, fmt.Errorf("system stats: %w", err)
|
||||
}
|
||||
stats := SystemStats{Timestamp: time.Now().UTC()}
|
||||
|
||||
stats := SystemStats{
|
||||
Timestamp: time.Now().UTC(),
|
||||
NCPU: info.NCPU,
|
||||
MemoryTotal: info.MemoryTotal,
|
||||
Containers: info.Containers,
|
||||
Running: info.Running,
|
||||
Paused: info.Paused,
|
||||
Stopped: info.Stopped,
|
||||
Images: info.Images,
|
||||
}
|
||||
g, gctx := errgroup.WithContext(ctx)
|
||||
|
||||
du, derr := c.api.DiskUsage(ctx, client.DiskUsageOptions{
|
||||
Containers: true,
|
||||
Images: true,
|
||||
Volumes: true,
|
||||
BuildCache: true,
|
||||
g.Go(func() error {
|
||||
info, err := c.Info(gctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("system stats info: %w", err)
|
||||
}
|
||||
stats.NCPU = info.NCPU
|
||||
stats.MemoryTotal = info.MemoryTotal
|
||||
stats.Containers = info.Containers
|
||||
stats.Running = info.Running
|
||||
stats.Paused = info.Paused
|
||||
stats.Stopped = info.Stopped
|
||||
stats.Images = info.Images
|
||||
return nil
|
||||
})
|
||||
if derr == nil {
|
||||
|
||||
var du *client.DiskUsageResult
|
||||
g.Go(func() error {
|
||||
usage, err := c.api.DiskUsage(gctx, client.DiskUsageOptions{
|
||||
Containers: true,
|
||||
Images: true,
|
||||
Volumes: true,
|
||||
BuildCache: true,
|
||||
})
|
||||
if err != nil {
|
||||
// Disk usage is best-effort; swallow but log so the dashboard
|
||||
// shows zeroed disk fields rather than failing entirely.
|
||||
slog.Warn("system stats: disk usage failed", "error", err)
|
||||
return nil
|
||||
}
|
||||
du = &usage
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := g.Wait(); err != nil {
|
||||
return SystemStats{}, err
|
||||
}
|
||||
|
||||
if du != nil {
|
||||
stats.DiskImagesBytes = du.Images.TotalSize
|
||||
stats.DiskContainersBytes = du.Containers.TotalSize
|
||||
stats.DiskVolumesBytes = du.Volumes.TotalSize
|
||||
|
||||
+46
-12
@@ -36,9 +36,11 @@ type Collector struct {
|
||||
store *store.Store
|
||||
docker *docker.Client
|
||||
|
||||
stopOnce sync.Once
|
||||
stop chan struct{}
|
||||
done chan struct{}
|
||||
startOnce sync.Once
|
||||
stopOnce sync.Once
|
||||
started bool
|
||||
stop chan struct{}
|
||||
done chan struct{}
|
||||
}
|
||||
|
||||
// New creates a new stats collector. Call Start to begin sampling.
|
||||
@@ -52,15 +54,24 @@ func New(s *store.Store, d *docker.Client) *Collector {
|
||||
}
|
||||
|
||||
// Start launches the background loop. Returns immediately. The loop exits
|
||||
// when Stop is called.
|
||||
// when Stop is called. Safe to call multiple times — only the first call has
|
||||
// an effect.
|
||||
func (c *Collector) Start() {
|
||||
go c.run()
|
||||
c.startOnce.Do(func() {
|
||||
c.started = true
|
||||
go c.run()
|
||||
})
|
||||
}
|
||||
|
||||
// Stop signals the collector to exit and blocks until it has finished the
|
||||
// in-flight tick.
|
||||
// in-flight tick. If Start was never called, Stop returns immediately.
|
||||
func (c *Collector) Stop() {
|
||||
c.stopOnce.Do(func() { close(c.stop) })
|
||||
c.stopOnce.Do(func() {
|
||||
close(c.stop)
|
||||
if !c.started {
|
||||
close(c.done)
|
||||
}
|
||||
})
|
||||
<-c.done
|
||||
}
|
||||
|
||||
@@ -70,6 +81,15 @@ func (c *Collector) Stop() {
|
||||
func (c *Collector) run() {
|
||||
defer close(c.done)
|
||||
|
||||
// Derive a base context that's cancelled when Stop is called so in-flight
|
||||
// Docker requests abort instead of waiting out their timeout.
|
||||
baseCtx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
go func() {
|
||||
<-c.stop
|
||||
cancel()
|
||||
}()
|
||||
|
||||
// Wait a few seconds before the first sample so the app has settled.
|
||||
select {
|
||||
case <-time.After(3 * time.Second):
|
||||
@@ -90,7 +110,7 @@ func (c *Collector) run() {
|
||||
}
|
||||
}
|
||||
|
||||
c.tick(retention)
|
||||
c.tick(baseCtx, retention)
|
||||
|
||||
select {
|
||||
case <-time.After(time.Duration(interval) * time.Second):
|
||||
@@ -126,8 +146,8 @@ func (c *Collector) readConfig() (intervalSeconds, retentionHours int) {
|
||||
// persists samples, and prunes rows beyond the retention window. When
|
||||
// the Docker daemon is unreachable the whole tick is skipped with a
|
||||
// single debug log instead of one warning per container.
|
||||
func (c *Collector) tick(retentionHours int) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
||||
func (c *Collector) tick(parent context.Context, retentionHours int) {
|
||||
ctx, cancel := context.WithTimeout(parent, 30*time.Second)
|
||||
defer cancel()
|
||||
|
||||
pingCtx, pingCancel := context.WithTimeout(ctx, 2*time.Second)
|
||||
@@ -224,10 +244,20 @@ func (c *Collector) sampleAll(ctx context.Context, targets []target) []store.Con
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for i, t := range targets {
|
||||
// Acquire the semaphore in the parent loop so ctx cancellation
|
||||
// short-circuits the queue rather than spawning goroutines that
|
||||
// block on an unreachable slot.
|
||||
select {
|
||||
case sem <- struct{}{}:
|
||||
case <-ctx.Done():
|
||||
break
|
||||
}
|
||||
if ctx.Err() != nil {
|
||||
break
|
||||
}
|
||||
wg.Add(1)
|
||||
go func(i int, t target) {
|
||||
defer wg.Done()
|
||||
sem <- struct{}{}
|
||||
defer func() { <-sem }()
|
||||
|
||||
sampleCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
@@ -278,8 +308,12 @@ func (c *Collector) recordSystemSample(ctx context.Context, workloadCPU float64,
|
||||
slog.Warn("stats collector: get system stats", "error", err)
|
||||
return
|
||||
}
|
||||
ts := sysStats.Timestamp.Unix()
|
||||
if ts <= 0 {
|
||||
ts = time.Now().UTC().Unix()
|
||||
}
|
||||
sample := store.SystemStatsSample{
|
||||
TS: sysStats.Timestamp.Unix(),
|
||||
TS: ts,
|
||||
NCPU: sysStats.NCPU,
|
||||
MemoryTotal: sysStats.MemoryTotal,
|
||||
WorkloadCPUPercent: workloadCPU,
|
||||
|
||||
@@ -1,13 +1,34 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
"database/sql"
|
||||
"encoding/hex"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// minWebhookSecretLength is the smallest user-supplied webhook secret accepted
|
||||
// at insert time. Auto-generated secrets are 64 hex chars (256 bits); a
|
||||
// 32-char floor still leaves > 128 bits of brute-force resistance for hex
|
||||
// alphabets and rejects obvious typos / placeholder strings.
|
||||
const minWebhookSecretLength = 32
|
||||
|
||||
// generateWebhookSecret returns a 256-bit hex-encoded random token. We use
|
||||
// crypto/rand directly rather than uuid.New() so the intent ("secret token,
|
||||
// not identifier") is explicit and the entropy is unambiguous.
|
||||
func generateWebhookSecret() string {
|
||||
b := make([]byte, 32)
|
||||
if _, err := rand.Read(b); err != nil {
|
||||
// crypto/rand is documented to never fail on supported platforms;
|
||||
// fall back to a UUID rather than panicking.
|
||||
return uuid.New().String()
|
||||
}
|
||||
return hex.EncodeToString(b)
|
||||
}
|
||||
|
||||
// projectCols is the canonical column list for projects queries.
|
||||
const projectCols = `id, name, registry, image, port, healthcheck, env, volumes,
|
||||
npm_access_list_id, webhook_secret, created_at, updated_at`
|
||||
@@ -19,7 +40,9 @@ func (s *Store) CreateProject(p Project) (Project, error) {
|
||||
p.CreatedAt = Now()
|
||||
p.UpdatedAt = p.CreatedAt
|
||||
if p.WebhookSecret == "" {
|
||||
p.WebhookSecret = uuid.New().String()
|
||||
p.WebhookSecret = generateWebhookSecret()
|
||||
} else if len(p.WebhookSecret) < minWebhookSecretLength {
|
||||
return Project{}, fmt.Errorf("webhook_secret must be at least %d characters", minWebhookSecretLength)
|
||||
}
|
||||
|
||||
_, err := s.db.Exec(
|
||||
@@ -163,7 +186,7 @@ func (s *Store) EnsureProjectWebhookSecret(id string) (string, error) {
|
||||
if project.WebhookSecret != "" {
|
||||
return project.WebhookSecret, nil
|
||||
}
|
||||
secret := uuid.New().String()
|
||||
secret := generateWebhookSecret()
|
||||
if err := s.SetProjectWebhookSecret(id, secret); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -22,7 +22,9 @@ func (s *Store) CreateStaticSite(site StaticSite) (StaticSite, error) {
|
||||
site.CreatedAt = Now()
|
||||
site.UpdatedAt = site.CreatedAt
|
||||
if site.WebhookSecret == "" {
|
||||
site.WebhookSecret = uuid.New().String()
|
||||
site.WebhookSecret = generateWebhookSecret()
|
||||
} else if len(site.WebhookSecret) < minWebhookSecretLength {
|
||||
return StaticSite{}, fmt.Errorf("webhook_secret must be at least %d characters", minWebhookSecretLength)
|
||||
}
|
||||
|
||||
_, err := s.db.Exec(
|
||||
@@ -301,7 +303,7 @@ func (s *Store) EnsureStaticSiteWebhookSecret(id string) (string, error) {
|
||||
if site.WebhookSecret != "" {
|
||||
return site.WebhookSecret, nil
|
||||
}
|
||||
secret := uuid.New().String()
|
||||
secret := generateWebhookSecret()
|
||||
if err := s.SetStaticSiteWebhookSecret(id, secret); err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
@@ -139,18 +139,28 @@ func (s *Store) ListSystemStatsSamples(sinceTS int64) ([]SystemStatsSample, erro
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// PruneStatsSamplesBefore deletes all samples older than the given unix timestamp
|
||||
// from both the container and system stats tables. Returns rows deleted across
|
||||
// both tables.
|
||||
// PruneStatsSamplesBefore deletes all samples older than the given unix
|
||||
// timestamp from both the container and system stats tables in a single
|
||||
// transaction so a crash between the two cannot leave one table pruned and
|
||||
// the other not. Returns rows deleted across both tables.
|
||||
func (s *Store) PruneStatsSamplesBefore(ts int64) (int64, error) {
|
||||
r1, err := s.db.Exec(`DELETE FROM container_stats_samples WHERE ts < ?`, ts)
|
||||
tx, err := s.db.Begin()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("begin prune tx: %w", err)
|
||||
}
|
||||
defer tx.Rollback()
|
||||
|
||||
r1, err := tx.Exec(`DELETE FROM container_stats_samples WHERE ts < ?`, ts)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("prune container stats samples: %w", err)
|
||||
}
|
||||
r2, err := s.db.Exec(`DELETE FROM system_stats_samples WHERE ts < ?`, ts)
|
||||
r2, err := tx.Exec(`DELETE FROM system_stats_samples WHERE ts < ?`, ts)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("prune system stats samples: %w", err)
|
||||
}
|
||||
if err := tx.Commit(); err != nil {
|
||||
return 0, fmt.Errorf("commit prune tx: %w", err)
|
||||
}
|
||||
n1, _ := r1.RowsAffected()
|
||||
n2, _ := r2.RowsAffected()
|
||||
return n1 + n2, nil
|
||||
|
||||
+12
-2
@@ -4,6 +4,7 @@ import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
_ "modernc.org/sqlite"
|
||||
@@ -214,8 +215,17 @@ func (s *Store) runMigrations() error {
|
||||
}
|
||||
|
||||
for _, m := range migrations {
|
||||
// Ignore errors from already-applied migrations (duplicate column).
|
||||
_, _ = s.db.Exec(m)
|
||||
if _, err := s.db.Exec(m); err != nil {
|
||||
// "duplicate column" / "already exists" are expected when a
|
||||
// migration has already been applied. Anything else (typo, FK
|
||||
// conflict, real schema bug) must surface, otherwise the store
|
||||
// silently runs against the wrong shape.
|
||||
msg := err.Error()
|
||||
if !strings.Contains(msg, "duplicate column") &&
|
||||
!strings.Contains(msg, "already exists") {
|
||||
return fmt.Errorf("apply migration %q: %w", m, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Create indexes on foreign key columns for query performance.
|
||||
|
||||
@@ -5,15 +5,27 @@ import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"github.com/alexei/tinyforge/internal/store"
|
||||
)
|
||||
|
||||
// maxSiteConcurrentSyncs caps fan-out of background site syncs triggered by
|
||||
// webhooks. Above this limit, requests are rejected with 503.
|
||||
const maxSiteConcurrentSyncs = 4
|
||||
|
||||
// maxWebhookBodyBytes caps the request body size for webhook payloads. The
|
||||
// /api routes already wrap the body with MaxBytesReader, but the webhook
|
||||
// router relies on its own limit so changes to the parent middleware can't
|
||||
// silently increase the cap.
|
||||
const maxWebhookBodyBytes = 256 * 1024 // 256 KiB
|
||||
|
||||
// DeployTriggerer is called when a webhook determines a deploy should happen.
|
||||
// Same interface as registry.DeployTriggerer — kept separate to avoid import cycles.
|
||||
type DeployTriggerer interface {
|
||||
@@ -114,12 +126,28 @@ type Handler struct {
|
||||
store *store.Store
|
||||
deployer DeployTriggerer
|
||||
sites SiteSyncTriggerer
|
||||
|
||||
// Site sync coordination — webhooks fire syncs in the background; Drain
|
||||
// blocks until those goroutines finish, so a graceful shutdown does not
|
||||
// kill an in-flight git fetch + container rebuild.
|
||||
siteSyncCtx context.Context
|
||||
siteSyncCancel context.CancelFunc
|
||||
siteSyncWG sync.WaitGroup
|
||||
siteSyncSem chan struct{}
|
||||
}
|
||||
|
||||
// NewHandler creates a new webhook Handler. The sites triggerer is optional
|
||||
// and may be nil (site webhooks will return 404).
|
||||
func NewHandler(st *store.Store, deployer DeployTriggerer, sites SiteSyncTriggerer) *Handler {
|
||||
return &Handler{store: st, deployer: deployer, sites: sites}
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
return &Handler{
|
||||
store: st,
|
||||
deployer: deployer,
|
||||
sites: sites,
|
||||
siteSyncCtx: ctx,
|
||||
siteSyncCancel: cancel,
|
||||
siteSyncSem: make(chan struct{}, maxSiteConcurrentSyncs),
|
||||
}
|
||||
}
|
||||
|
||||
// SetSiteSyncTriggerer injects the static-site manager after construction.
|
||||
@@ -130,6 +158,13 @@ func (h *Handler) SetSiteSyncTriggerer(s SiteSyncTriggerer) {
|
||||
h.sites = s
|
||||
}
|
||||
|
||||
// Drain cancels in-flight site syncs and waits for their goroutines to exit.
|
||||
// Safe to call from a graceful-shutdown path.
|
||||
func (h *Handler) Drain() {
|
||||
h.siteSyncCancel()
|
||||
h.siteSyncWG.Wait()
|
||||
}
|
||||
|
||||
// Route returns a chi router with the webhook endpoints mounted.
|
||||
//
|
||||
// Routes:
|
||||
@@ -183,7 +218,8 @@ func (h *Handler) handleWebhook(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
var payload Payload
|
||||
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
|
||||
dec := json.NewDecoder(io.LimitReader(r.Body, maxWebhookBodyBytes))
|
||||
if err := dec.Decode(&payload); err != nil {
|
||||
respondWebhookError(w, http.StatusBadRequest, "invalid JSON payload")
|
||||
return
|
||||
}
|
||||
@@ -302,10 +338,20 @@ func (h *Handler) handleSiteWebhook(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Body is optional — decode best-effort.
|
||||
// Body is optional. We attempt to decode but accept an empty body (no Ref
|
||||
// filter); a malformed non-empty body is treated as bad-request to avoid
|
||||
// silently bypassing the branch/tag filter.
|
||||
var payload SitePayload
|
||||
if r.ContentLength > 0 {
|
||||
_ = json.NewDecoder(r.Body).Decode(&payload)
|
||||
body, err := io.ReadAll(io.LimitReader(r.Body, maxWebhookBodyBytes))
|
||||
if err != nil {
|
||||
respondWebhookError(w, http.StatusBadRequest, "failed to read request body")
|
||||
return
|
||||
}
|
||||
if len(body) > 0 {
|
||||
if err := json.Unmarshal(body, &payload); err != nil {
|
||||
respondWebhookError(w, http.StatusBadRequest, "invalid JSON payload")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
if payload.Ref != "" && !siteRefMatches(site, payload.Ref) {
|
||||
@@ -320,9 +366,20 @@ func (h *Handler) handleSiteWebhook(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
// Fire and forget — sync may take a while (git fetch + container rebuild).
|
||||
// Cap concurrent syncs so a runaway CI cannot fan out unbounded
|
||||
// git-clone goroutines.
|
||||
select {
|
||||
case h.siteSyncSem <- struct{}{}:
|
||||
default:
|
||||
respondWebhookError(w, http.StatusServiceUnavailable, "site sync queue full")
|
||||
return
|
||||
}
|
||||
|
||||
h.siteSyncWG.Add(1)
|
||||
go func(siteID, siteName string) {
|
||||
if err := h.sites.Deploy(context.Background(), siteID, false); err != nil {
|
||||
defer h.siteSyncWG.Done()
|
||||
defer func() { <-h.siteSyncSem }()
|
||||
if err := h.sites.Deploy(h.siteSyncCtx, siteID, false); err != nil {
|
||||
slog.Error("webhook: site sync failed", "site", siteName, "error", err)
|
||||
}
|
||||
}(site.ID, site.Name)
|
||||
|
||||
@@ -2,6 +2,7 @@ package webhook
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"path"
|
||||
"strings"
|
||||
|
||||
@@ -24,7 +25,8 @@ func matchStage(st *store.Store, projectID, tag string) (store.Stage, bool, erro
|
||||
|
||||
matched, err := path.Match(pattern, tag)
|
||||
if err != nil {
|
||||
// Invalid pattern — skip this stage.
|
||||
slog.Warn("webhook: invalid tag pattern, skipping stage",
|
||||
"project", projectID, "stage", stage.Name, "pattern", pattern, "error", err)
|
||||
continue
|
||||
}
|
||||
if matched {
|
||||
@@ -36,9 +38,21 @@ func matchStage(st *store.Store, projectID, tag string) (store.Stage, bool, erro
|
||||
}
|
||||
|
||||
// imageMatches reports whether an incoming image reference matches the
|
||||
// project's stored image. The comparison is case-sensitive and exact.
|
||||
// project's stored image. The registry hostname is matched case-insensitively
|
||||
// (per RFC: registry hostnames are case-insensitive); the path/owner/name are
|
||||
// matched exactly.
|
||||
func imageMatches(projectImage, incomingImage string) bool {
|
||||
return projectImage == incomingImage
|
||||
if projectImage == incomingImage {
|
||||
return true
|
||||
}
|
||||
pIdx := strings.IndexByte(projectImage, '/')
|
||||
iIdx := strings.IndexByte(incomingImage, '/')
|
||||
if pIdx <= 0 || iIdx <= 0 {
|
||||
return false
|
||||
}
|
||||
pHost, pPath := projectImage[:pIdx], projectImage[pIdx:]
|
||||
iHost, iPath := incomingImage[:iIdx], incomingImage[iIdx:]
|
||||
return strings.EqualFold(pHost, iHost) && pPath == iPath
|
||||
}
|
||||
|
||||
// siteRefMatches reports whether a Git ref (e.g. "refs/heads/main" or
|
||||
|
||||
Reference in New Issue
Block a user