Files
tiny-forge/internal/api/docker.go
T
alexei.dolgolyov 7f2d1bdae1 feat(workload): switch buildActiveImagesSet to containers index
First consumer migration off the instances table. The image
prune logic now walks the normalized containers.image_ref
column directly — one DB pass against a single table instead
of joining instances against projects to reconstruct the full
"image:tag" string. Demonstrates the consumer-switch pattern
the remaining read sites (proxies, stale scanner, webhook
matcher) will follow.

The legacy `projects []store.Project` parameter is kept on the
function signature for now so call sites don't change in this
commit; the underscore-discard in the body makes it explicit
that it's no longer load-bearing.
2026-05-09 13:47:20 +03:00

413 lines
12 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package api
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net/http"
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/go-chi/chi/v5"
"github.com/alexei/tinyforge/internal/store"
)
// Limits and constants for the log endpoints.
const (
defaultLogTail = 200
maxLogTail = 5000
maxJSONLogBytes = 4 << 20 // 4 MiB cap for non-streaming log responses
maxLogLineBytes = 1 << 20 // 1 MiB max line length for the bufio.Scanner
logHeartbeatPeriod = 20 * time.Second
)
// ANSI escape sequence patterns. Stripped from streamed log lines so a
// hostile container cannot inject terminal control sequences (cursor moves,
// hyperlink escapes, screen clears) into operator displays or pasted output.
var (
ansiCSIPattern = regexp.MustCompile(`\x1b\[[0-9;?]*[ -/]*[@-~]`)
ansiOSCPattern = regexp.MustCompile(`\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)`)
ctlBytePattern = regexp.MustCompile(`[\x00-\x08\x0b-\x1a\x1c-\x1f\x7f]`)
)
// listProjectImages handles GET /api/projects/{id}/images.
// Returns all local Docker images matching the project's image reference.
func (s *Server) listProjectImages(w http.ResponseWriter, r *http.Request) {
id := chi.URLParam(r, "id")
project, err := s.store.GetProjectByID(id)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
respondNotFound(w, "project")
return
}
slog.Error("failed to get project", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
return
}
if s.docker == nil || project.Image == "" {
respondJSON(w, http.StatusOK, []any{})
return
}
images, err := s.docker.ListImagesByRef(r.Context(), project.Image)
if err != nil {
slog.Warn("list project images", "project", project.Name, "error", err)
respondJSON(w, http.StatusOK, []any{})
return
}
respondJSON(w, http.StatusOK, images)
}
// streamContainerLogs handles GET /api/projects/{id}/stages/{stage}/instances/{iid}/logs.
// Streams container logs via SSE. Supports query params:
// - tail: number of lines from end (default "200")
// - follow: "true" to stream new lines in real-time
func (s *Server) streamContainerLogs(w http.ResponseWriter, r *http.Request) {
projectID := chi.URLParam(r, "id")
stageID := chi.URLParam(r, "stage")
instanceID := chi.URLParam(r, "iid")
inst, err := s.store.GetInstanceByID(instanceID)
if err != nil {
if errors.Is(err, store.ErrNotFound) {
respondNotFound(w, "instance")
return
}
slog.Error("failed to get instance", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
return
}
// Verify the instance actually belongs to the project/stage in the path.
// Without this, a user could stream logs for any instance ID by guessing
// it under the wrong project — defence-in-depth for future per-project ACLs.
if inst.ProjectID != projectID || inst.StageID != stageID {
respondNotFound(w, "instance")
return
}
if inst.ContainerID == "" {
respondError(w, http.StatusBadRequest, "instance has no container")
return
}
s.streamLogsForContainer(w, r, inst.ContainerID)
}
// streamLogsForContainer streams logs for an arbitrary container ID using the
// shared SSE/JSON dual-mode pattern. Owner-specific handlers (instance, site)
// should validate ownership and then delegate here.
func (s *Server) streamLogsForContainer(w http.ResponseWriter, r *http.Request, containerID string) {
if s.docker == nil {
respondError(w, http.StatusServiceUnavailable, "Docker is not available")
return
}
tail := parseTailParam(r.URL.Query().Get("tail"))
follow := r.URL.Query().Get("follow") == "true"
// Check if client accepts SSE.
accept := r.Header.Get("Accept")
isSSE := strings.Contains(accept, "text/event-stream")
logReader, err := s.docker.ContainerLogs(r.Context(), containerID, follow && isSSE, tail)
if err != nil {
slog.Error("failed to get container logs", "container", containerID, "error", err)
respondError(w, http.StatusInternalServerError, "failed to get container logs")
return
}
defer logReader.Close()
if !isSSE {
// JSON mode: cap the total bytes read so a chatty container with
// tail=large cannot exhaust server memory.
scanner := bufio.NewScanner(io.LimitReader(logReader, maxJSONLogBytes))
scanner.Buffer(make([]byte, 0, 64*1024), maxLogLineBytes)
var lines []string
for scanner.Scan() {
line := sanitizeDockerLogLine(scanner.Text())
if line != "" {
lines = append(lines, line)
}
}
if lines == nil {
lines = []string{}
}
respondJSON(w, http.StatusOK, lines)
return
}
// SSE mode: stream lines as they arrive.
release, ok := acquireSSESlot(w, s.sseGate)
if !ok {
return
}
defer release()
flusher, ok := w.(http.Flusher)
if !ok {
respondError(w, http.StatusInternalServerError, "streaming not supported")
return
}
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
// Heartbeat keeps the connection warm through proxies that close idle
// streams. Sent as an SSE comment which the EventSource API ignores.
heartbeat := time.NewTicker(logHeartbeatPeriod)
defer heartbeat.Stop()
heartbeatDone := make(chan struct{})
defer close(heartbeatDone)
var hbMu sync.Mutex
go func() {
for {
select {
case <-heartbeat.C:
hbMu.Lock()
_, _ = io.WriteString(w, ": ping\n\n")
flusher.Flush()
hbMu.Unlock()
case <-heartbeatDone:
return
case <-r.Context().Done():
return
}
}
}()
scanner := bufio.NewScanner(logReader)
scanner.Buffer(make([]byte, 0, 64*1024), maxLogLineBytes)
for scanner.Scan() {
line := sanitizeDockerLogLine(scanner.Text())
if line == "" {
continue
}
data, _ := json.Marshal(map[string]string{"line": line})
hbMu.Lock()
fmt.Fprintf(w, "data: %s\n\n", data)
flusher.Flush()
hbMu.Unlock()
// Check if client disconnected.
select {
case <-r.Context().Done():
return
default:
}
}
}
// parseTailParam validates and clamps the ?tail= query value. Empty/invalid
// inputs fall back to the default; values above the cap are clamped down.
// "all" is rejected — letting the caller request unbounded log history is a
// trivial DoS vector.
func parseTailParam(raw string) string {
if raw == "" {
return strconv.Itoa(defaultLogTail)
}
n, err := strconv.Atoi(raw)
if err != nil || n <= 0 {
return strconv.Itoa(defaultLogTail)
}
if n > maxLogTail {
n = maxLogTail
}
return strconv.Itoa(n)
}
// sanitizeDockerLogLine strips the Docker log stream header (8-byte prefix)
// that Docker adds to non-TTY container logs, and removes terminal control
// sequences so a hostile container cannot inject ANSI escapes that hijack an
// operator's terminal when log output is pasted or rendered raw.
func sanitizeDockerLogLine(line string) string {
// Docker multiplexed stream: first 8 bytes are header (stream type + size).
// If the line starts with a non-printable byte followed by 0x00 0x00 0x00, strip 8 bytes.
if len(line) > 8 && (line[0] == 1 || line[0] == 2) && line[1] == 0 && line[2] == 0 && line[3] == 0 {
line = line[8:]
}
line = ansiOSCPattern.ReplaceAllString(line, "")
line = ansiCSIPattern.ReplaceAllString(line, "")
line = ctlBytePattern.ReplaceAllString(line, "")
return line
}
// buildActiveImagesSet returns the set of "image:tag" strings currently used
// by any container, computed in a single DB pass against the normalized
// containers index. Returning an error (rather than swallowing) prevents
// prune logic from treating a transient DB failure as "nothing is active".
func buildActiveImagesSet(st *store.Store, projects []store.Project) (map[string]bool, error) {
// `projects` is unused now — kept in the signature for back-compat with
// callers that already happen to have the slice. The image_ref column
// holds the full "image:tag" string written by the deployer.
_ = projects
containers, err := st.ListContainers(store.ContainerFilter{})
if err != nil {
return nil, fmt.Errorf("list containers: %w", err)
}
active := make(map[string]bool, len(containers))
for _, c := range containers {
if c.ImageRef == "" {
continue
}
active[c.ImageRef] = true
}
return active, nil
}
// unusedImageStats handles GET /api/docker/unused-images.
// Returns the total size of unused project images and whether the threshold is exceeded.
func (s *Server) unusedImageStats(w http.ResponseWriter, r *http.Request) {
if s.docker == nil {
respondJSON(w, http.StatusOK, map[string]any{
"total_size_mb": 0, "count": 0, "threshold_mb": 0, "exceeded": false,
})
return
}
settings, err := s.store.GetSettings()
if err != nil {
slog.Error("unused images: get settings", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
return
}
projects, err := s.store.GetAllProjects()
if err != nil {
slog.Error("unused images: list projects", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
return
}
// Build set of active image refs in one DB pass instead of N×K queries.
// A flaky read here previously masqueraded as "no images are active",
// which on the prune endpoint would have deleted *running* images.
activeImages, err := buildActiveImagesSet(s.store, projects)
if err != nil {
slog.Error("unused images: build active set", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
return
}
// Sum unused image sizes.
ctx := r.Context()
var totalSize int64
var count int
for _, p := range projects {
if p.Image == "" {
continue
}
images, err := s.docker.ListImagesByRef(ctx, p.Image)
if err != nil {
continue
}
for _, img := range images {
if !activeImages[img.Ref] {
totalSize += img.Size
count++
}
}
}
totalMB := totalSize / (1024 * 1024)
exceeded := settings.ImagePruneThresholdMB > 0 && int(totalMB) >= settings.ImagePruneThresholdMB
respondJSON(w, http.StatusOK, map[string]any{
"total_size_mb": totalMB,
"count": count,
"threshold_mb": settings.ImagePruneThresholdMB,
"exceeded": exceeded,
})
}
// pruneImages handles POST /api/docker/prune-images.
// Only removes images that belong to Tinyforge projects (not all system images).
func (s *Server) pruneImages(w http.ResponseWriter, r *http.Request) {
if s.docker == nil {
respondError(w, http.StatusServiceUnavailable, "Docker is not available")
return
}
// Collect all image references from our projects.
projects, err := s.store.GetAllProjects()
if err != nil {
slog.Error("prune: failed to list projects", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
return
}
// Build a set of image refs used by active instances. Bail out on error
// — silently treating a DB blip as "no active images" would prune
// images currently in use by running containers.
activeImages, err := buildActiveImagesSet(s.store, projects)
if err != nil {
slog.Error("prune: build active set", "error", err)
respondError(w, http.StatusInternalServerError, "internal server error")
return
}
// Collect all unique image bases from projects (without tags).
projectImages := make(map[string]bool)
for _, p := range projects {
if p.Image != "" {
projectImages[p.Image] = true
}
}
if len(projectImages) == 0 {
respondJSON(w, http.StatusOK, map[string]any{
"images_removed": 0,
"space_reclaimed_mb": 0,
"message": "No project images to clean up",
})
return
}
// List all local Docker images and find ones matching our projects but not actively used.
ctx := r.Context()
removed := 0
var reclaimedBytes int64
for imageBase := range projectImages {
// List all tags for this image.
images, err := s.docker.ListImagesByRef(ctx, imageBase)
if err != nil {
slog.Warn("prune: list images", "image", imageBase, "error", err)
continue
}
for _, img := range images {
// Skip images that are actively used by running instances.
if activeImages[img.Ref] {
continue
}
// Remove unused image.
if err := s.docker.RemoveImage(ctx, img.ID); err != nil {
slog.Warn("prune: remove image", "image", img.Ref, "error", err)
continue
}
removed++
reclaimedBytes += img.Size
slog.Info("prune: removed image", "ref", img.Ref, "size_mb", img.Size/(1024*1024))
}
}
respondJSON(w, http.StatusOK, map[string]any{
"images_removed": removed,
"space_reclaimed_mb": reclaimedBytes / (1024 * 1024),
})
}