package api import ( "bufio" "encoding/json" "fmt" "io" "log/slog" "net/http" "regexp" "strconv" "strings" "sync" "time" "github.com/alexei/tinyforge/internal/store" ) // Limits and constants for the log endpoints. const ( defaultLogTail = 200 maxLogTail = 5000 maxJSONLogBytes = 4 << 20 // 4 MiB cap for non-streaming log responses maxLogLineBytes = 1 << 20 // 1 MiB max line length for the bufio.Scanner logHeartbeatPeriod = 20 * time.Second ) // ANSI escape sequence patterns. Stripped from streamed log lines so a // hostile container cannot inject terminal control sequences (cursor moves, // hyperlink escapes, screen clears) into operator displays or pasted output. var ( ansiCSIPattern = regexp.MustCompile(`\x1b\[[0-9;?]*[ -/]*[@-~]`) ansiOSCPattern = regexp.MustCompile(`\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)`) ctlBytePattern = regexp.MustCompile(`[\x00-\x08\x0b-\x1a\x1c-\x1f\x7f]`) ) // streamLogsForContainer streams logs for an arbitrary container ID using the // shared SSE/JSON dual-mode pattern. Owner-specific handlers (workload-container) // should validate ownership and then delegate here. func (s *Server) streamLogsForContainer(w http.ResponseWriter, r *http.Request, containerID string) { if s.docker == nil { respondError(w, http.StatusServiceUnavailable, "Docker is not available") return } tail := parseTailParam(r.URL.Query().Get("tail")) follow := r.URL.Query().Get("follow") == "true" // Check if client accepts SSE. accept := r.Header.Get("Accept") isSSE := strings.Contains(accept, "text/event-stream") logReader, err := s.docker.ContainerLogs(r.Context(), containerID, follow && isSSE, tail) if err != nil { slog.Error("failed to get container logs", "container", containerID, "error", err) respondError(w, http.StatusInternalServerError, "failed to get container logs") return } defer logReader.Close() if !isSSE { // JSON mode: cap the total bytes read so a chatty container with // tail=large cannot exhaust server memory. scanner := bufio.NewScanner(io.LimitReader(logReader, maxJSONLogBytes)) scanner.Buffer(make([]byte, 0, 64*1024), maxLogLineBytes) var lines []string for scanner.Scan() { line := sanitizeDockerLogLine(scanner.Text()) if line != "" { lines = append(lines, line) } } if lines == nil { lines = []string{} } respondJSON(w, http.StatusOK, lines) return } // SSE mode: stream lines as they arrive. release, ok := acquireSSESlot(w, s.sseGate) if !ok { return } defer release() flusher, ok := w.(http.Flusher) if !ok { respondError(w, http.StatusInternalServerError, "streaming not supported") return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") // Heartbeat keeps the connection warm through proxies that close idle // streams. Sent as an SSE comment which the EventSource API ignores. heartbeat := time.NewTicker(logHeartbeatPeriod) defer heartbeat.Stop() heartbeatDone := make(chan struct{}) defer close(heartbeatDone) var hbMu sync.Mutex go func() { for { select { case <-heartbeat.C: hbMu.Lock() _, _ = io.WriteString(w, ": ping\n\n") flusher.Flush() hbMu.Unlock() case <-heartbeatDone: return case <-r.Context().Done(): return } } }() scanner := bufio.NewScanner(logReader) scanner.Buffer(make([]byte, 0, 64*1024), maxLogLineBytes) for scanner.Scan() { line := sanitizeDockerLogLine(scanner.Text()) if line == "" { continue } data, _ := json.Marshal(map[string]string{"line": line}) hbMu.Lock() fmt.Fprintf(w, "data: %s\n\n", data) flusher.Flush() hbMu.Unlock() // Check if client disconnected. select { case <-r.Context().Done(): return default: } } } // parseTailParam validates and clamps the ?tail= query value. Empty/invalid // inputs fall back to the default; values above the cap are clamped down. // "all" is rejected — letting the caller request unbounded log history is a // trivial DoS vector. func parseTailParam(raw string) string { if raw == "" { return strconv.Itoa(defaultLogTail) } n, err := strconv.Atoi(raw) if err != nil || n <= 0 { return strconv.Itoa(defaultLogTail) } if n > maxLogTail { n = maxLogTail } return strconv.Itoa(n) } // sanitizeDockerLogLine strips the Docker log stream header (8-byte prefix) // that Docker adds to non-TTY container logs, and removes terminal control // sequences so a hostile container cannot inject ANSI escapes that hijack an // operator's terminal when log output is pasted or rendered raw. func sanitizeDockerLogLine(line string) string { // Docker multiplexed stream: first 8 bytes are header (stream type + size). // If the line starts with a non-printable byte followed by 0x00 0x00 0x00, strip 8 bytes. if len(line) > 8 && (line[0] == 1 || line[0] == 2) && line[1] == 0 && line[2] == 0 && line[3] == 0 { line = line[8:] } line = ansiOSCPattern.ReplaceAllString(line, "") line = ansiCSIPattern.ReplaceAllString(line, "") line = ctlBytePattern.ReplaceAllString(line, "") return line } // buildActiveImagesSet returns the set of "image:tag" strings currently used // by any container, computed in a single DB pass against the normalized // containers index. Returning an error (rather than swallowing) prevents // prune logic from treating a transient DB failure as "nothing is active". func buildActiveImagesSet(st *store.Store) (map[string]bool, error) { containers, err := st.ListContainers(store.ContainerFilter{}) if err != nil { return nil, fmt.Errorf("list containers: %w", err) } active := make(map[string]bool, len(containers)) for _, c := range containers { if c.ImageRef == "" { continue } active[c.ImageRef] = true } return active, nil } // workloadImageBases returns the set of "image" strings (no tag) that // some workload currently mounts to, derived from container.image_ref. // This replaces the legacy "list all projects → projects[].Image" view // after the workload-first cutover. func workloadImageBases(st *store.Store) (map[string]bool, error) { containers, err := st.ListContainers(store.ContainerFilter{}) if err != nil { return nil, fmt.Errorf("list containers: %w", err) } bases := make(map[string]bool, len(containers)) for _, c := range containers { if c.ImageRef == "" { continue } ref, _ := splitImageTag(c.ImageRef) if ref != "" { bases[ref] = true } } return bases, nil } // splitImageTag splits "image:tag" into image and tag parts. Returns the // full string and empty tag if no colon separator is found. Inlined here // because the legacy deploys.go that owned it was removed. func splitImageTag(ref string) (string, string) { if idx := strings.LastIndex(ref, ":"); idx != -1 { afterColon := ref[idx+1:] if !strings.Contains(afterColon, "/") { return ref[:idx], afterColon } } return ref, "" } // unusedImageStats handles GET /api/docker/unused-images. Returns the total // size of unused workload images and whether the threshold is exceeded. func (s *Server) unusedImageStats(w http.ResponseWriter, r *http.Request) { if s.docker == nil { respondJSON(w, http.StatusOK, map[string]any{ "total_size_mb": 0, "count": 0, "threshold_mb": 0, "exceeded": false, }) return } settings, err := s.store.GetSettings() if err != nil { slog.Error("unused images: get settings", "error", err) respondError(w, http.StatusInternalServerError, "internal server error") return } imageBases, err := workloadImageBases(s.store) if err != nil { slog.Error("unused images: list workload images", "error", err) respondError(w, http.StatusInternalServerError, "internal server error") return } activeImages, err := buildActiveImagesSet(s.store) if err != nil { slog.Error("unused images: build active set", "error", err) respondError(w, http.StatusInternalServerError, "internal server error") return } ctx := r.Context() var totalSize int64 var count int for base := range imageBases { images, err := s.docker.ListImagesByRef(ctx, base) if err != nil { continue } for _, img := range images { if !activeImages[img.Ref] { totalSize += img.Size count++ } } } totalMB := totalSize / (1024 * 1024) exceeded := settings.ImagePruneThresholdMB > 0 && int(totalMB) >= settings.ImagePruneThresholdMB respondJSON(w, http.StatusOK, map[string]any{ "total_size_mb": totalMB, "count": count, "threshold_mb": settings.ImagePruneThresholdMB, "exceeded": exceeded, }) } // pruneImages handles POST /api/docker/prune-images. Only removes images that // some workload references (via container.image_ref), never arbitrary host // images. func (s *Server) pruneImages(w http.ResponseWriter, r *http.Request) { if s.docker == nil { respondError(w, http.StatusServiceUnavailable, "Docker is not available") return } imageBases, err := workloadImageBases(s.store) if err != nil { slog.Error("prune: list workload images", "error", err) respondError(w, http.StatusInternalServerError, "internal server error") return } activeImages, err := buildActiveImagesSet(s.store) if err != nil { slog.Error("prune: build active set", "error", err) respondError(w, http.StatusInternalServerError, "internal server error") return } if len(imageBases) == 0 { respondJSON(w, http.StatusOK, map[string]any{ "images_removed": 0, "space_reclaimed_mb": 0, "message": "No workload images to clean up", }) return } ctx := r.Context() removed := 0 var reclaimedBytes int64 for base := range imageBases { images, err := s.docker.ListImagesByRef(ctx, base) if err != nil { slog.Warn("prune: list images", "image", base, "error", err) continue } for _, img := range images { if activeImages[img.Ref] { continue } if err := s.docker.RemoveImage(ctx, img.ID); err != nil { slog.Warn("prune: remove image", "image", img.Ref, "error", err) continue } removed++ reclaimedBytes += img.Size slog.Info("prune: removed image", "ref", img.Ref, "size_mb", img.Size/(1024*1024)) } } respondJSON(w, http.StatusOK, map[string]any{ "images_removed": removed, "space_reclaimed_mb": reclaimedBytes / (1024 * 1024), }) } // pruneBuildCache handles POST /api/docker/prune-build-cache. It removes // unused Docker build-cache records daemon-wide (all=false), so an app's next // rebuild still hits its warm cache. The build cache is regenerable by // definition — pruning only forces slower rebuilds, never data loss — and the // dockerfile/static deploy paths never reclaim it on teardown, so it grows // monotonically until pruned here. func (s *Server) pruneBuildCache(w http.ResponseWriter, r *http.Request) { if s.docker == nil { respondError(w, http.StatusServiceUnavailable, "Docker is not available") return } result, err := s.docker.PruneBuildCache(r.Context(), false) if err != nil { slog.Error("prune: build cache", "error", err) respondError(w, http.StatusInternalServerError, "internal server error") return } slog.Info("prune: build cache", "caches_deleted", result.CachesDeleted, "space_reclaimed_mb", result.SpaceReclaimed/(1024*1024)) respondJSON(w, http.StatusOK, map[string]any{ "caches_deleted": result.CachesDeleted, "space_reclaimed_mb": result.SpaceReclaimed / (1024 * 1024), }) }