d8ab22876f
Build / build (push) Successful in 10m41s
End-to-end extraction of the Instance concept. After this commit:
* internal/store/instances.go — DELETED
* internal/store/models.go — Instance struct gone, ProxyRoute moved here
* containers table is the single source of truth for project/stack/site
container state. instances table is dropped via DROP TABLE migration
(idempotent; re-runnable on every boot).
* Legacy tinyforge.project / tinyforge.stage / tinyforge.instance-id
Docker labels are no longer emitted; only tinyforge.workload.{id,kind},
tinyforge.role, and tinyforge.managed are stamped on new containers.
Backend rewrites:
- internal/deployer: executeDeploy + blueGreenDeploy + rollback +
promote use store.Container natively. New
removeContainer() replaces removeInstance().
enforceMaxInstances reads via
ListContainersByStageID.
- internal/reconciler: legacy tinyforge.instance-id dispatch removed;
upsertByWorkloadLabel now finds existing rows
by docker container ID first and falls back to
the deterministic workloadID:role key.
- internal/stale/scanner: Scan + new FindStaleContainers walk the
containers table; emit StaleContainer JSON.
- internal/stats/collector: ListContainers replaces ListAllInstances.
- internal/webhook/handler: workload-secret lookup tried first; falls back
to project / static_site secret column.
- internal/api: instances.go, stale.go, stats.go, stats_history.go,
projects.go, settings.go, docker.go, dns.go all read /
write through Container.
Docker layer:
- ManagedContainer exposes WorkloadID/Kind/Role from the canonical labels.
- ListContainers filters by tinyforge.managed=true.
- Network creation uses LabelManaged instead of LabelProject.
Frontend:
- Instance type is now a Container alias; .status → .state,
.last_alive_at → .last_seen_at.
- InstanceCard takes stageId as a prop (no longer derived from Instance).
- StaleContainer JSON shape rewritten: { container, workload_name, role,
days_stale }. StaleContainerCard + /containers/stale page updated.
- ProjectCard / homepage / SystemHealthCard filter by .state.
The migration loop now tolerates "no such table" alongside "duplicate
column" / "already exists" so obsolete ALTER TABLE entries targeting the
dropped instances table no-op cleanly on first boot.
Tests: store + deployer + reconciler + webhook + staticsite + notify all
still pass. Frontend svelte-check: zero errors.
420 lines
12 KiB
Go
420 lines
12 KiB
Go
package api
|
||
|
||
import (
|
||
"bufio"
|
||
"encoding/json"
|
||
"errors"
|
||
"fmt"
|
||
"io"
|
||
"log/slog"
|
||
"net/http"
|
||
"regexp"
|
||
"strconv"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/go-chi/chi/v5"
|
||
|
||
"github.com/alexei/tinyforge/internal/store"
|
||
)
|
||
|
||
// Limits and constants for the log endpoints.
|
||
const (
|
||
defaultLogTail = 200
|
||
maxLogTail = 5000
|
||
maxJSONLogBytes = 4 << 20 // 4 MiB cap for non-streaming log responses
|
||
maxLogLineBytes = 1 << 20 // 1 MiB max line length for the bufio.Scanner
|
||
logHeartbeatPeriod = 20 * time.Second
|
||
)
|
||
|
||
// ANSI escape sequence patterns. Stripped from streamed log lines so a
|
||
// hostile container cannot inject terminal control sequences (cursor moves,
|
||
// hyperlink escapes, screen clears) into operator displays or pasted output.
|
||
var (
|
||
ansiCSIPattern = regexp.MustCompile(`\x1b\[[0-9;?]*[ -/]*[@-~]`)
|
||
ansiOSCPattern = regexp.MustCompile(`\x1b\][^\x07\x1b]*(?:\x07|\x1b\\)`)
|
||
ctlBytePattern = regexp.MustCompile(`[\x00-\x08\x0b-\x1a\x1c-\x1f\x7f]`)
|
||
)
|
||
|
||
// listProjectImages handles GET /api/projects/{id}/images.
|
||
// Returns all local Docker images matching the project's image reference.
|
||
func (s *Server) listProjectImages(w http.ResponseWriter, r *http.Request) {
|
||
id := chi.URLParam(r, "id")
|
||
|
||
project, err := s.store.GetProjectByID(id)
|
||
if err != nil {
|
||
if errors.Is(err, store.ErrNotFound) {
|
||
respondNotFound(w, "project")
|
||
return
|
||
}
|
||
slog.Error("failed to get project", "error", err)
|
||
respondError(w, http.StatusInternalServerError, "internal server error")
|
||
return
|
||
}
|
||
|
||
if s.docker == nil || project.Image == "" {
|
||
respondJSON(w, http.StatusOK, []any{})
|
||
return
|
||
}
|
||
|
||
images, err := s.docker.ListImagesByRef(r.Context(), project.Image)
|
||
if err != nil {
|
||
slog.Warn("list project images", "project", project.Name, "error", err)
|
||
respondJSON(w, http.StatusOK, []any{})
|
||
return
|
||
}
|
||
|
||
respondJSON(w, http.StatusOK, images)
|
||
}
|
||
|
||
// streamContainerLogs handles GET /api/projects/{id}/stages/{stage}/instances/{iid}/logs.
|
||
// Streams container logs via SSE. {iid} is the container row ID. Ownership is
|
||
// verified by joining through workload + stage so an attacker cannot stream
|
||
// logs for a foreign container by guessing IDs under the wrong project URL.
|
||
func (s *Server) streamContainerLogs(w http.ResponseWriter, r *http.Request) {
|
||
projectID := chi.URLParam(r, "id")
|
||
stageID := chi.URLParam(r, "stage")
|
||
containerRowID := chi.URLParam(r, "iid")
|
||
|
||
c, err := s.store.GetContainerByID(containerRowID)
|
||
if err != nil {
|
||
if errors.Is(err, store.ErrNotFound) {
|
||
respondNotFound(w, "container")
|
||
return
|
||
}
|
||
slog.Error("failed to get container", "error", err)
|
||
respondError(w, http.StatusInternalServerError, "internal server error")
|
||
return
|
||
}
|
||
|
||
wl, err := s.store.GetWorkloadByID(c.WorkloadID)
|
||
if err != nil {
|
||
respondNotFound(w, "container")
|
||
return
|
||
}
|
||
stage, err := s.store.GetStageByID(stageID)
|
||
if err != nil || stage.ProjectID != projectID {
|
||
respondNotFound(w, "container")
|
||
return
|
||
}
|
||
if wl.Kind != string(store.WorkloadKindProject) || wl.RefID != projectID || c.Role != stage.Name {
|
||
respondNotFound(w, "container")
|
||
return
|
||
}
|
||
|
||
if c.ContainerID == "" {
|
||
respondError(w, http.StatusBadRequest, "container row has no docker container bound")
|
||
return
|
||
}
|
||
|
||
s.streamLogsForContainer(w, r, c.ContainerID)
|
||
}
|
||
|
||
// streamLogsForContainer streams logs for an arbitrary container ID using the
|
||
// shared SSE/JSON dual-mode pattern. Owner-specific handlers (instance, site)
|
||
// should validate ownership and then delegate here.
|
||
func (s *Server) streamLogsForContainer(w http.ResponseWriter, r *http.Request, containerID string) {
|
||
if s.docker == nil {
|
||
respondError(w, http.StatusServiceUnavailable, "Docker is not available")
|
||
return
|
||
}
|
||
|
||
tail := parseTailParam(r.URL.Query().Get("tail"))
|
||
follow := r.URL.Query().Get("follow") == "true"
|
||
|
||
// Check if client accepts SSE.
|
||
accept := r.Header.Get("Accept")
|
||
isSSE := strings.Contains(accept, "text/event-stream")
|
||
|
||
logReader, err := s.docker.ContainerLogs(r.Context(), containerID, follow && isSSE, tail)
|
||
if err != nil {
|
||
slog.Error("failed to get container logs", "container", containerID, "error", err)
|
||
respondError(w, http.StatusInternalServerError, "failed to get container logs")
|
||
return
|
||
}
|
||
defer logReader.Close()
|
||
|
||
if !isSSE {
|
||
// JSON mode: cap the total bytes read so a chatty container with
|
||
// tail=large cannot exhaust server memory.
|
||
scanner := bufio.NewScanner(io.LimitReader(logReader, maxJSONLogBytes))
|
||
scanner.Buffer(make([]byte, 0, 64*1024), maxLogLineBytes)
|
||
var lines []string
|
||
for scanner.Scan() {
|
||
line := sanitizeDockerLogLine(scanner.Text())
|
||
if line != "" {
|
||
lines = append(lines, line)
|
||
}
|
||
}
|
||
if lines == nil {
|
||
lines = []string{}
|
||
}
|
||
respondJSON(w, http.StatusOK, lines)
|
||
return
|
||
}
|
||
|
||
// SSE mode: stream lines as they arrive.
|
||
release, ok := acquireSSESlot(w, s.sseGate)
|
||
if !ok {
|
||
return
|
||
}
|
||
defer release()
|
||
|
||
flusher, ok := w.(http.Flusher)
|
||
if !ok {
|
||
respondError(w, http.StatusInternalServerError, "streaming not supported")
|
||
return
|
||
}
|
||
|
||
w.Header().Set("Content-Type", "text/event-stream")
|
||
w.Header().Set("Cache-Control", "no-cache")
|
||
w.Header().Set("Connection", "keep-alive")
|
||
|
||
// Heartbeat keeps the connection warm through proxies that close idle
|
||
// streams. Sent as an SSE comment which the EventSource API ignores.
|
||
heartbeat := time.NewTicker(logHeartbeatPeriod)
|
||
defer heartbeat.Stop()
|
||
heartbeatDone := make(chan struct{})
|
||
defer close(heartbeatDone)
|
||
var hbMu sync.Mutex
|
||
go func() {
|
||
for {
|
||
select {
|
||
case <-heartbeat.C:
|
||
hbMu.Lock()
|
||
_, _ = io.WriteString(w, ": ping\n\n")
|
||
flusher.Flush()
|
||
hbMu.Unlock()
|
||
case <-heartbeatDone:
|
||
return
|
||
case <-r.Context().Done():
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
|
||
scanner := bufio.NewScanner(logReader)
|
||
scanner.Buffer(make([]byte, 0, 64*1024), maxLogLineBytes)
|
||
for scanner.Scan() {
|
||
line := sanitizeDockerLogLine(scanner.Text())
|
||
if line == "" {
|
||
continue
|
||
}
|
||
|
||
data, _ := json.Marshal(map[string]string{"line": line})
|
||
hbMu.Lock()
|
||
fmt.Fprintf(w, "data: %s\n\n", data)
|
||
flusher.Flush()
|
||
hbMu.Unlock()
|
||
|
||
// Check if client disconnected.
|
||
select {
|
||
case <-r.Context().Done():
|
||
return
|
||
default:
|
||
}
|
||
}
|
||
}
|
||
|
||
// parseTailParam validates and clamps the ?tail= query value. Empty/invalid
|
||
// inputs fall back to the default; values above the cap are clamped down.
|
||
// "all" is rejected — letting the caller request unbounded log history is a
|
||
// trivial DoS vector.
|
||
func parseTailParam(raw string) string {
|
||
if raw == "" {
|
||
return strconv.Itoa(defaultLogTail)
|
||
}
|
||
n, err := strconv.Atoi(raw)
|
||
if err != nil || n <= 0 {
|
||
return strconv.Itoa(defaultLogTail)
|
||
}
|
||
if n > maxLogTail {
|
||
n = maxLogTail
|
||
}
|
||
return strconv.Itoa(n)
|
||
}
|
||
|
||
// sanitizeDockerLogLine strips the Docker log stream header (8-byte prefix)
|
||
// that Docker adds to non-TTY container logs, and removes terminal control
|
||
// sequences so a hostile container cannot inject ANSI escapes that hijack an
|
||
// operator's terminal when log output is pasted or rendered raw.
|
||
func sanitizeDockerLogLine(line string) string {
|
||
// Docker multiplexed stream: first 8 bytes are header (stream type + size).
|
||
// If the line starts with a non-printable byte followed by 0x00 0x00 0x00, strip 8 bytes.
|
||
if len(line) > 8 && (line[0] == 1 || line[0] == 2) && line[1] == 0 && line[2] == 0 && line[3] == 0 {
|
||
line = line[8:]
|
||
}
|
||
line = ansiOSCPattern.ReplaceAllString(line, "")
|
||
line = ansiCSIPattern.ReplaceAllString(line, "")
|
||
line = ctlBytePattern.ReplaceAllString(line, "")
|
||
return line
|
||
}
|
||
|
||
// buildActiveImagesSet returns the set of "image:tag" strings currently used
|
||
// by any container, computed in a single DB pass against the normalized
|
||
// containers index. Returning an error (rather than swallowing) prevents
|
||
// prune logic from treating a transient DB failure as "nothing is active".
|
||
func buildActiveImagesSet(st *store.Store, projects []store.Project) (map[string]bool, error) {
|
||
// `projects` is unused now — kept in the signature for back-compat with
|
||
// callers that already happen to have the slice. The image_ref column
|
||
// holds the full "image:tag" string written by the deployer.
|
||
_ = projects
|
||
containers, err := st.ListContainers(store.ContainerFilter{})
|
||
if err != nil {
|
||
return nil, fmt.Errorf("list containers: %w", err)
|
||
}
|
||
active := make(map[string]bool, len(containers))
|
||
for _, c := range containers {
|
||
if c.ImageRef == "" {
|
||
continue
|
||
}
|
||
active[c.ImageRef] = true
|
||
}
|
||
return active, nil
|
||
}
|
||
|
||
// unusedImageStats handles GET /api/docker/unused-images.
|
||
// Returns the total size of unused project images and whether the threshold is exceeded.
|
||
func (s *Server) unusedImageStats(w http.ResponseWriter, r *http.Request) {
|
||
if s.docker == nil {
|
||
respondJSON(w, http.StatusOK, map[string]any{
|
||
"total_size_mb": 0, "count": 0, "threshold_mb": 0, "exceeded": false,
|
||
})
|
||
return
|
||
}
|
||
|
||
settings, err := s.store.GetSettings()
|
||
if err != nil {
|
||
slog.Error("unused images: get settings", "error", err)
|
||
respondError(w, http.StatusInternalServerError, "internal server error")
|
||
return
|
||
}
|
||
|
||
projects, err := s.store.GetAllProjects()
|
||
if err != nil {
|
||
slog.Error("unused images: list projects", "error", err)
|
||
respondError(w, http.StatusInternalServerError, "internal server error")
|
||
return
|
||
}
|
||
|
||
// Build set of active image refs in one DB pass instead of N×K queries.
|
||
// A flaky read here previously masqueraded as "no images are active",
|
||
// which on the prune endpoint would have deleted *running* images.
|
||
activeImages, err := buildActiveImagesSet(s.store, projects)
|
||
if err != nil {
|
||
slog.Error("unused images: build active set", "error", err)
|
||
respondError(w, http.StatusInternalServerError, "internal server error")
|
||
return
|
||
}
|
||
|
||
// Sum unused image sizes.
|
||
ctx := r.Context()
|
||
var totalSize int64
|
||
var count int
|
||
for _, p := range projects {
|
||
if p.Image == "" {
|
||
continue
|
||
}
|
||
images, err := s.docker.ListImagesByRef(ctx, p.Image)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
for _, img := range images {
|
||
if !activeImages[img.Ref] {
|
||
totalSize += img.Size
|
||
count++
|
||
}
|
||
}
|
||
}
|
||
|
||
totalMB := totalSize / (1024 * 1024)
|
||
exceeded := settings.ImagePruneThresholdMB > 0 && int(totalMB) >= settings.ImagePruneThresholdMB
|
||
|
||
respondJSON(w, http.StatusOK, map[string]any{
|
||
"total_size_mb": totalMB,
|
||
"count": count,
|
||
"threshold_mb": settings.ImagePruneThresholdMB,
|
||
"exceeded": exceeded,
|
||
})
|
||
}
|
||
|
||
// pruneImages handles POST /api/docker/prune-images.
|
||
// Only removes images that belong to Tinyforge projects (not all system images).
|
||
func (s *Server) pruneImages(w http.ResponseWriter, r *http.Request) {
|
||
if s.docker == nil {
|
||
respondError(w, http.StatusServiceUnavailable, "Docker is not available")
|
||
return
|
||
}
|
||
|
||
// Collect all image references from our projects.
|
||
projects, err := s.store.GetAllProjects()
|
||
if err != nil {
|
||
slog.Error("prune: failed to list projects", "error", err)
|
||
respondError(w, http.StatusInternalServerError, "internal server error")
|
||
return
|
||
}
|
||
|
||
// Build a set of image refs used by active instances. Bail out on error
|
||
// — silently treating a DB blip as "no active images" would prune
|
||
// images currently in use by running containers.
|
||
activeImages, err := buildActiveImagesSet(s.store, projects)
|
||
if err != nil {
|
||
slog.Error("prune: build active set", "error", err)
|
||
respondError(w, http.StatusInternalServerError, "internal server error")
|
||
return
|
||
}
|
||
|
||
// Collect all unique image bases from projects (without tags).
|
||
projectImages := make(map[string]bool)
|
||
for _, p := range projects {
|
||
if p.Image != "" {
|
||
projectImages[p.Image] = true
|
||
}
|
||
}
|
||
|
||
if len(projectImages) == 0 {
|
||
respondJSON(w, http.StatusOK, map[string]any{
|
||
"images_removed": 0,
|
||
"space_reclaimed_mb": 0,
|
||
"message": "No project images to clean up",
|
||
})
|
||
return
|
||
}
|
||
|
||
// List all local Docker images and find ones matching our projects but not actively used.
|
||
ctx := r.Context()
|
||
removed := 0
|
||
var reclaimedBytes int64
|
||
|
||
for imageBase := range projectImages {
|
||
// List all tags for this image.
|
||
images, err := s.docker.ListImagesByRef(ctx, imageBase)
|
||
if err != nil {
|
||
slog.Warn("prune: list images", "image", imageBase, "error", err)
|
||
continue
|
||
}
|
||
|
||
for _, img := range images {
|
||
// Skip images that are actively used by running instances.
|
||
if activeImages[img.Ref] {
|
||
continue
|
||
}
|
||
|
||
// Remove unused image.
|
||
if err := s.docker.RemoveImage(ctx, img.ID); err != nil {
|
||
slog.Warn("prune: remove image", "image", img.Ref, "error", err)
|
||
continue
|
||
}
|
||
removed++
|
||
reclaimedBytes += img.Size
|
||
slog.Info("prune: removed image", "ref", img.Ref, "size_mb", img.Size/(1024*1024))
|
||
}
|
||
}
|
||
|
||
respondJSON(w, http.StatusOK, map[string]any{
|
||
"images_removed": removed,
|
||
"space_reclaimed_mb": reclaimedBytes / (1024 * 1024),
|
||
})
|
||
}
|