Files
tiny-forge/internal/api/docker.go
T
alexei.dolgolyov 97f338fba3 feat(maintenance): add Docker build-cache prune action
Add an admin-only POST /api/docker/prune-build-cache endpoint plus a Settings > Maintenance danger-zone button to reclaim disk used by the Docker build cache (image + static-site builds), which previously grew unbounded with no UI lever. Prunes unused-only (all=false) so a warm cache is preserved for apps redeploying soon. Mirrors the existing prune-images vertical slice; full en/ru i18n parity.
2026-06-02 13:34:05 +03:00

380 lines
11 KiB
Go

package api
import (
"bufio"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/alexei/tinyforge/internal/store"
)
// Limits and constants for the log endpoints.
const (
defaultLogTail = 200
maxLogTail = 5000
maxJSONLogBytes = 4 << 20 // 4 MiB cap for non-streaming log responses
maxLogLineBytes = 1 << 20 // 1 MiB max line length for the bufio.Scanner
logHeartbeatPeriod = 20 * time.Second
)
// ANSI escape sequence patterns. Stripped from streamed log lines so a
// hostile container cannot inject terminal control sequences (cursor moves,
// hyperlink escapes, screen clears) into operator displays or pasted output.
var (
ansiCSIPattern = regexp.MustCompile(`\x1b\[[0-9;?]*[ -/]*[@-~]`)
ansiOSCPattern = regexp.MustCompile(`\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)`)
ctlBytePattern = regexp.MustCompile(`[\x00-\x08\x0b-\x1a\x1c-\x1f\x7f]`)
)
// streamLogsForContainer streams logs for an arbitrary container ID using the
// shared SSE/JSON dual-mode pattern. Owner-specific handlers (workload-container)
// should validate ownership and then delegate here.
func (s *Server) streamLogsForContainer(w http.ResponseWriter, r *http.Request, containerID string) {
if s.docker == nil {
respondError(w, http.StatusServiceUnavailable, "Docker is not available")
return
}
tail := parseTailParam(r.URL.Query().Get("tail"))
follow := r.URL.Query().Get("follow") == "true"
// Check if client accepts SSE.
accept := r.Header.Get("Accept")
isSSE := strings.Contains(accept, "text/event-stream")
logReader, err := s.docker.ContainerLogs(r.Context(), containerID, follow && isSSE, tail)
if err != nil {
slog.Error("failed to get container logs", "container", containerID, "error", err)
respondError(w, http.StatusInternalServerError, "failed to get container logs")
return
}
defer logReader.Close()
if !isSSE {
// JSON mode: cap the total bytes read so a chatty container with
// tail=large cannot exhaust server memory.
scanner := bufio.NewScanner(io.LimitReader(logReader, maxJSONLogBytes))
scanner.Buffer(make([]byte, 0, 64*1024), maxLogLineBytes)
var lines []string
for scanner.Scan() {
line := sanitizeDockerLogLine(scanner.Text())
if line != "" {
lines = append(lines, line)
}
}
if lines == nil {
lines = []string{}
}
respondJSON(w, http.StatusOK, lines)
return
}
// SSE mode: stream lines as they arrive.
release, ok := acquireSSESlot(w, s.sseGate)
if !ok {
return
}
defer release()
flusher, ok := w.(http.Flusher)
if !ok {
respondError(w, http.StatusInternalServerError, "streaming not supported")
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// Heartbeat keeps the connection warm through proxies that close idle
// streams. Sent as an SSE comment which the EventSource API ignores.
heartbeat := time.NewTicker(logHeartbeatPeriod)
defer heartbeat.Stop()
heartbeatDone := make(chan struct{})
defer close(heartbeatDone)
var hbMu sync.Mutex
go func() {
for {
select {
case <-heartbeat.C:
hbMu.Lock()
_, _ = io.WriteString(w, ": ping\n\n")
flusher.Flush()
hbMu.Unlock()
case <-heartbeatDone:
return
case <-r.Context().Done():
return
}
}
}()
scanner := bufio.NewScanner(logReader)
scanner.Buffer(make([]byte, 0, 64*1024), maxLogLineBytes)
for scanner.Scan() {
line := sanitizeDockerLogLine(scanner.Text())
if line == "" {
continue
}
data, _ := json.Marshal(map[string]string{"line": line})
hbMu.Lock()
fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush()
hbMu.Unlock()
// Check if client disconnected.
select {
case <-r.Context().Done():
return
default:
}
}
}
// parseTailParam validates and clamps the ?tail= query value. Empty/invalid
// inputs fall back to the default; values above the cap are clamped down.
// "all" is rejected — letting the caller request unbounded log history is a
// trivial DoS vector.
func parseTailParam(raw string) string {
if raw == "" {
return strconv.Itoa(defaultLogTail)
}
n, err := strconv.Atoi(raw)
if err != nil || n <= 0 {
return strconv.Itoa(defaultLogTail)
}
if n > maxLogTail {
n = maxLogTail
}
return strconv.Itoa(n)
}
// sanitizeDockerLogLine strips the Docker log stream header (8-byte prefix)
// that Docker adds to non-TTY container logs, and removes terminal control
// sequences so a hostile container cannot inject ANSI escapes that hijack an
// operator's terminal when log output is pasted or rendered raw.
func sanitizeDockerLogLine(line string) string {
// Docker multiplexed stream: first 8 bytes are header (stream type + size).
// If the line starts with a non-printable byte followed by 0x00 0x00 0x00, strip 8 bytes.
if len(line) > 8 && (line[0] == 1 || line[0] == 2) && line[1] == 0 && line[2] == 0 && line[3] == 0 {
line = line[8:]
}
line = ansiOSCPattern.ReplaceAllString(line, "")
line = ansiCSIPattern.ReplaceAllString(line, "")
line = ctlBytePattern.ReplaceAllString(line, "")
return line
}
// buildActiveImagesSet returns the set of "image:tag" strings currently used
// by any container, computed in a single DB pass against the normalized
// containers index. Returning an error (rather than swallowing) prevents
// prune logic from treating a transient DB failure as "nothing is active".
func buildActiveImagesSet(st *store.Store) (map[string]bool, error) {
containers, err := st.ListContainers(store.ContainerFilter{})
if err != nil {
return nil, fmt.Errorf("list containers: %w", err)
}
active := make(map[string]bool, len(containers))
for _, c := range containers {
if c.ImageRef == "" {
continue
}
active[c.ImageRef] = true
}
return active, nil
}
// workloadImageBases returns the set of "image" strings (no tag) that
// some workload currently mounts to, derived from container.image_ref.
// This replaces the legacy "list all projects → projects[].Image" view
// after the workload-first cutover.
func workloadImageBases(st *store.Store) (map[string]bool, error) {
containers, err := st.ListContainers(store.ContainerFilter{})
if err != nil {
return nil, fmt.Errorf("list containers: %w", err)
}
bases := make(map[string]bool, len(containers))
for _, c := range containers {
if c.ImageRef == "" {
continue
}
ref, _ := splitImageTag(c.ImageRef)
if ref != "" {
bases[ref] = true
}
}
return bases, nil
}
// splitImageTag splits "image:tag" into image and tag parts. Returns the
// full string and empty tag if no colon separator is found. Inlined here
// because the legacy deploys.go that owned it was removed.
func splitImageTag(ref string) (string, string) {
if idx := strings.LastIndex(ref, ":"); idx != -1 {
afterColon := ref[idx+1:]
if !strings.Contains(afterColon, "/") {
return ref[:idx], afterColon
}
}
return ref, ""
}
// unusedImageStats handles GET /api/docker/unused-images. Returns the total
// size of unused workload images and whether the threshold is exceeded.
func (s *Server) unusedImageStats(w http.ResponseWriter, r *http.Request) {
if s.docker == nil {
respondJSON(w, http.StatusOK, map[string]any{
"total_size_mb": 0, "count": 0, "threshold_mb": 0, "exceeded": false,
})
return
}
settings, err := s.store.GetSettings()
if err != nil {
slog.Error("unused images: get settings", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
return
}
imageBases, err := workloadImageBases(s.store)
if err != nil {
slog.Error("unused images: list workload images", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
return
}
activeImages, err := buildActiveImagesSet(s.store)
if err != nil {
slog.Error("unused images: build active set", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
return
}
ctx := r.Context()
var totalSize int64
var count int
for base := range imageBases {
images, err := s.docker.ListImagesByRef(ctx, base)
if err != nil {
continue
}
for _, img := range images {
if !activeImages[img.Ref] {
totalSize += img.Size
count++
}
}
}
totalMB := totalSize / (1024 * 1024)
exceeded := settings.ImagePruneThresholdMB > 0 && int(totalMB) >= settings.ImagePruneThresholdMB
respondJSON(w, http.StatusOK, map[string]any{
"total_size_mb": totalMB,
"count": count,
"threshold_mb": settings.ImagePruneThresholdMB,
"exceeded": exceeded,
})
}
// pruneImages handles POST /api/docker/prune-images. Only removes images that
// some workload references (via container.image_ref), never arbitrary host
// images.
func (s *Server) pruneImages(w http.ResponseWriter, r *http.Request) {
if s.docker == nil {
respondError(w, http.StatusServiceUnavailable, "Docker is not available")
return
}
imageBases, err := workloadImageBases(s.store)
if err != nil {
slog.Error("prune: list workload images", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
return
}
activeImages, err := buildActiveImagesSet(s.store)
if err != nil {
slog.Error("prune: build active set", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
return
}
if len(imageBases) == 0 {
respondJSON(w, http.StatusOK, map[string]any{
"images_removed": 0,
"space_reclaimed_mb": 0,
"message": "No workload images to clean up",
})
return
}
ctx := r.Context()
removed := 0
var reclaimedBytes int64
for base := range imageBases {
images, err := s.docker.ListImagesByRef(ctx, base)
if err != nil {
slog.Warn("prune: list images", "image", base, "error", err)
continue
}
for _, img := range images {
if activeImages[img.Ref] {
continue
}
if err := s.docker.RemoveImage(ctx, img.ID); err != nil {
slog.Warn("prune: remove image", "image", img.Ref, "error", err)
continue
}
removed++
reclaimedBytes += img.Size
slog.Info("prune: removed image", "ref", img.Ref, "size_mb", img.Size/(1024*1024))
}
}
respondJSON(w, http.StatusOK, map[string]any{
"images_removed": removed,
"space_reclaimed_mb": reclaimedBytes / (1024 * 1024),
})
}
// pruneBuildCache handles POST /api/docker/prune-build-cache. It removes
// unused Docker build-cache records daemon-wide (all=false), so an app's next
// rebuild still hits its warm cache. The build cache is regenerable by
// definition — pruning only forces slower rebuilds, never data loss — and the
// dockerfile/static deploy paths never reclaim it on teardown, so it grows
// monotonically until pruned here.
func (s *Server) pruneBuildCache(w http.ResponseWriter, r *http.Request) {
if s.docker == nil {
respondError(w, http.StatusServiceUnavailable, "Docker is not available")
return
}
result, err := s.docker.PruneBuildCache(r.Context(), false)
if err != nil {
slog.Error("prune: build cache", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
return
}
slog.Info("prune: build cache",
"caches_deleted", result.CachesDeleted,
"space_reclaimed_mb", result.SpaceReclaimed/(1024*1024))
respondJSON(w, http.StatusOK, map[string]any{
"caches_deleted": result.CachesDeleted,
"space_reclaimed_mb": result.SpaceReclaimed / (1024 * 1024),
})
}