Add per-workload capture of host-bind data volumes as downloadable tar.gz archives: a new internal/volsnap engine (enumerate host-bind volumes via the computeMounts merge, archive with archive/tar+gzip skipping symlinks/special files, per-workload retention + startup orphan cleanup), a volume_snapshots table + store CRUD, admin-gated API (list/snapshotable/create/download/delete), and a Snapshots panel on /apps/[id] that shows coverage and which volumes are skipped (and why). Scope: image-source apps, host-bind scopes (absolute/stage/project); Docker named volumes, tmpfs, and instance scope are surfaced as not-yet-supported. Restore is a separate later phase. Download/FilePath are containment-checked; create returns a typed no-data error (400) vs generic 500. Covered by archiver unit tests + full API e2e.
This commit is contained in:
@@ -19,6 +19,7 @@ import (
|
||||
"github.com/alexei/tinyforge/internal/proxy"
|
||||
"github.com/alexei/tinyforge/internal/stale"
|
||||
"github.com/alexei/tinyforge/internal/store"
|
||||
"github.com/alexei/tinyforge/internal/volsnap"
|
||||
"github.com/alexei/tinyforge/internal/webhook"
|
||||
"github.com/alexei/tinyforge/internal/workload/plugin"
|
||||
)
|
||||
@@ -56,6 +57,7 @@ type Server struct {
|
||||
onDNSProviderChanged DNSProviderChangedFunc
|
||||
|
||||
backupEngine *backup.Engine
|
||||
snapshotEngine *volsnap.Engine
|
||||
sseGate *sseGate
|
||||
logScanReloader LogScanReloader
|
||||
dbPath string
|
||||
@@ -119,6 +121,11 @@ func (s *Server) SetBackupEngine(engine *backup.Engine) {
|
||||
s.backupEngine = engine
|
||||
}
|
||||
|
||||
// SetSnapshotEngine sets the volume-snapshot engine on the server.
|
||||
func (s *Server) SetSnapshotEngine(engine *volsnap.Engine) {
|
||||
s.snapshotEngine = engine
|
||||
}
|
||||
|
||||
// SetDBPath sets the database file path (needed for restore).
|
||||
func (s *Server) SetDBPath(path string) {
|
||||
s.dbPath = path
|
||||
@@ -329,6 +336,13 @@ func (s *Server) Router() chi.Router {
|
||||
r.With(auth.AdminOnly).Post("/start", s.startPluginWorkload)
|
||||
r.With(auth.AdminOnly).Delete("/", s.deletePluginWorkload)
|
||||
|
||||
// Volume snapshots (admin-only). Capture/list a workload's
|
||||
// host-bind data volumes; {sid}-scoped download/delete live
|
||||
// in the global admin group alongside backups.
|
||||
r.With(auth.AdminOnly).Get("/snapshots", s.listWorkloadSnapshots)
|
||||
r.With(auth.AdminOnly).Get("/snapshotable", s.getWorkloadSnapshotable)
|
||||
r.With(auth.AdminOnly).Post("/snapshots", s.createWorkloadSnapshot)
|
||||
|
||||
// Runtime view: per-source persisted state + storage usage.
|
||||
// Read-only; safe for any authenticated user.
|
||||
r.Get("/runtime-state", s.getWorkloadRuntimeState)
|
||||
@@ -519,6 +533,11 @@ func (s *Server) Router() chi.Router {
|
||||
r.Get("/backups/{id}/download", s.downloadBackup)
|
||||
r.Delete("/backups/{id}", s.deleteBackup)
|
||||
r.Post("/backups/{id}/restore", s.restoreBackup)
|
||||
|
||||
// Volume-snapshot download/delete (workload-scoped capture +
|
||||
// list live under /workloads/{id}/snapshots).
|
||||
r.Get("/snapshots/{sid}/download", s.downloadSnapshot)
|
||||
r.Delete("/snapshots/{sid}", s.deleteSnapshot)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
@@ -0,0 +1,177 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"github.com/alexei/tinyforge/internal/store"
|
||||
"github.com/alexei/tinyforge/internal/volsnap"
|
||||
)
|
||||
|
||||
// listWorkloadSnapshots handles GET /api/workloads/{id}/snapshots.
|
||||
func (s *Server) listWorkloadSnapshots(w http.ResponseWriter, r *http.Request) {
|
||||
if s.snapshotEngine == nil {
|
||||
respondError(w, http.StatusServiceUnavailable, "snapshot engine not initialized")
|
||||
return
|
||||
}
|
||||
id := chi.URLParam(r, "id")
|
||||
snaps, err := s.snapshotEngine.List(id)
|
||||
if err != nil {
|
||||
slog.Error("snapshots: list", "workload", id, "error", err)
|
||||
respondError(w, http.StatusInternalServerError, "internal server error")
|
||||
return
|
||||
}
|
||||
respondJSON(w, http.StatusOK, snaps)
|
||||
}
|
||||
|
||||
// snapshotableVolume is the sanitized view of a volume in the snapshotable
|
||||
// response — it omits the resolved host path so internal layout is not leaked.
|
||||
type snapshotableVolume struct {
|
||||
Target string `json:"target"`
|
||||
Scope string `json:"scope"`
|
||||
Source string `json:"source"`
|
||||
}
|
||||
|
||||
// getWorkloadSnapshotable handles GET /api/workloads/{id}/snapshotable. It
|
||||
// tells the UI which volumes can be snapshotted and which are skipped (and
|
||||
// why), so users are never misled about coverage.
|
||||
func (s *Server) getWorkloadSnapshotable(w http.ResponseWriter, r *http.Request) {
|
||||
if s.snapshotEngine == nil {
|
||||
respondError(w, http.StatusServiceUnavailable, "snapshot engine not initialized")
|
||||
return
|
||||
}
|
||||
id := chi.URLParam(r, "id")
|
||||
workload, err := s.store.GetWorkloadByID(id)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusNotFound, "workload not found")
|
||||
return
|
||||
}
|
||||
settings, err := s.store.GetSettings()
|
||||
if err != nil {
|
||||
respondError(w, http.StatusInternalServerError, "internal server error")
|
||||
return
|
||||
}
|
||||
refs, skipped, err := volsnap.SnapshotableVolumes(s.store, workload, settings)
|
||||
if err != nil {
|
||||
slog.Error("snapshots: enumerate", "workload", id, "error", err)
|
||||
respondError(w, http.StatusInternalServerError, "internal server error")
|
||||
return
|
||||
}
|
||||
|
||||
volumes := make([]snapshotableVolume, 0, len(refs))
|
||||
for _, ref := range refs {
|
||||
volumes = append(volumes, snapshotableVolume{Target: ref.Target, Scope: ref.Scope, Source: ref.Source})
|
||||
}
|
||||
if skipped == nil {
|
||||
skipped = []volsnap.SkippedVolume{}
|
||||
}
|
||||
respondJSON(w, http.StatusOK, map[string]any{
|
||||
"volumes": volumes,
|
||||
"skipped": skipped,
|
||||
})
|
||||
}
|
||||
|
||||
// createWorkloadSnapshot handles POST /api/workloads/{id}/snapshots.
|
||||
func (s *Server) createWorkloadSnapshot(w http.ResponseWriter, r *http.Request) {
|
||||
if s.snapshotEngine == nil {
|
||||
respondError(w, http.StatusServiceUnavailable, "snapshot engine not initialized")
|
||||
return
|
||||
}
|
||||
id := chi.URLParam(r, "id")
|
||||
workload, err := s.store.GetWorkloadByID(id)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusNotFound, "workload not found")
|
||||
return
|
||||
}
|
||||
settings, err := s.store.GetSettings()
|
||||
if err != nil {
|
||||
respondError(w, http.StatusInternalServerError, "internal server error")
|
||||
return
|
||||
}
|
||||
|
||||
var body struct {
|
||||
Label string `json:"label"`
|
||||
}
|
||||
if r.ContentLength != 0 {
|
||||
if err := json.NewDecoder(io.LimitReader(r.Body, 1<<20)).Decode(&body); err != nil && !errors.Is(err, io.EOF) {
|
||||
respondError(w, http.StatusBadRequest, "invalid JSON body")
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
snap, err := s.snapshotEngine.Create(workload, settings, body.Label)
|
||||
if err != nil {
|
||||
// "no snapshottable volume data" is client-actionable (400, safe to
|
||||
// echo). Any other error is server-side: log the detail, return a
|
||||
// generic 500 so internal paths / DB text never reach the client.
|
||||
if errors.Is(err, volsnap.ErrNoSnapshotData) {
|
||||
respondError(w, http.StatusBadRequest, err.Error())
|
||||
return
|
||||
}
|
||||
slog.Error("snapshots: create", "workload", id, "error", err)
|
||||
respondError(w, http.StatusInternalServerError, "internal server error")
|
||||
return
|
||||
}
|
||||
respondJSON(w, http.StatusCreated, snap)
|
||||
}
|
||||
|
||||
// deleteSnapshot handles DELETE /api/snapshots/{sid}.
|
||||
func (s *Server) deleteSnapshot(w http.ResponseWriter, r *http.Request) {
|
||||
if s.snapshotEngine == nil {
|
||||
respondError(w, http.StatusServiceUnavailable, "snapshot engine not initialized")
|
||||
return
|
||||
}
|
||||
sid := chi.URLParam(r, "sid")
|
||||
if err := s.snapshotEngine.Delete(sid); err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondError(w, http.StatusNotFound, "snapshot not found")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "failed to delete snapshot")
|
||||
return
|
||||
}
|
||||
respondJSON(w, http.StatusOK, map[string]string{"status": "deleted"})
|
||||
}
|
||||
|
||||
// downloadSnapshot handles GET /api/snapshots/{sid}/download, streaming the
|
||||
// tar.gz archive. The resolved path is containment-checked against the
|
||||
// snapshot directory.
|
||||
func (s *Server) downloadSnapshot(w http.ResponseWriter, r *http.Request) {
|
||||
if s.snapshotEngine == nil {
|
||||
respondError(w, http.StatusServiceUnavailable, "snapshot engine not initialized")
|
||||
return
|
||||
}
|
||||
sid := chi.URLParam(r, "sid")
|
||||
snap, err := s.snapshotEngine.Get(sid)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusNotFound, "snapshot not found")
|
||||
return
|
||||
}
|
||||
path, err := s.snapshotEngine.FilePath(snap)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusForbidden, "access denied")
|
||||
return
|
||||
}
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusNotFound, "snapshot file not found on disk")
|
||||
return
|
||||
}
|
||||
defer f.Close()
|
||||
stat, err := f.Stat()
|
||||
if err != nil {
|
||||
respondError(w, http.StatusInternalServerError, "failed to read snapshot file")
|
||||
return
|
||||
}
|
||||
name := filepath.Base(snap.Filename)
|
||||
w.Header().Set("Content-Type", "application/gzip")
|
||||
w.Header().Set("Content-Disposition", "attachment; filename=\""+name+"\"")
|
||||
http.ServeContent(w, r, name, stat.ModTime(), f)
|
||||
}
|
||||
@@ -0,0 +1,178 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"io"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"github.com/alexei/tinyforge/internal/auth"
|
||||
"github.com/alexei/tinyforge/internal/store"
|
||||
"github.com/alexei/tinyforge/internal/volsnap"
|
||||
"github.com/alexei/tinyforge/internal/webhook"
|
||||
)
|
||||
|
||||
// newSnapshotEnv builds an API test env with the volume-snapshot engine wired
|
||||
// (the shared newAPITestEnv does not wire it). dataDir holds the snapshot
|
||||
// archives; baseVol is where host-bind volume directories resolve.
|
||||
func newSnapshotEnv(t *testing.T) (*apiTestEnv, string) {
|
||||
t.Helper()
|
||||
st, err := store.New(":memory:")
|
||||
if err != nil {
|
||||
t.Fatalf("create store: %v", err)
|
||||
}
|
||||
t.Cleanup(func() { st.Close() })
|
||||
|
||||
encKey := [32]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
|
||||
dispatcher := &fakeAPIDispatcher{}
|
||||
wh := webhook.NewHandler(st)
|
||||
wh.SetPluginDispatcher(dispatcher)
|
||||
srv := NewServer(st, nil, nil, nil, dispatcher, nil, wh, nil, encKey)
|
||||
|
||||
snapEng, err := volsnap.New(st, t.TempDir())
|
||||
if err != nil {
|
||||
t.Fatalf("snapshot engine: %v", err)
|
||||
}
|
||||
srv.SetSnapshotEngine(snapEng)
|
||||
|
||||
httpsrv := httptest.NewServer(srv.Router())
|
||||
t.Cleanup(httpsrv.Close)
|
||||
|
||||
la := auth.NewLocalAuth(encKey)
|
||||
tok, err := la.GenerateToken(auth.Claims{UserID: "u-admin", Username: "admin", Role: "admin"})
|
||||
if err != nil {
|
||||
t.Fatalf("mint token: %v", err)
|
||||
}
|
||||
|
||||
baseVol := t.TempDir()
|
||||
settings, _ := st.GetSettings()
|
||||
settings.BaseVolumePath = baseVol
|
||||
if err := st.UpdateSettings(settings); err != nil {
|
||||
t.Fatalf("update settings: %v", err)
|
||||
}
|
||||
|
||||
return &apiTestEnv{srv: httpsrv, store: st, dispatcher: dispatcher, adminToken: tok.Token, encKey: encKey}, baseVol
|
||||
}
|
||||
|
||||
func TestVolumeSnapshots_EndToEnd(t *testing.T) {
|
||||
e, baseVol := newSnapshotEnv(t)
|
||||
|
||||
w, err := e.store.CreateWorkload(store.Workload{
|
||||
Name: "data-app",
|
||||
Kind: "project",
|
||||
SourceKind: "image",
|
||||
SourceConfig: `{"image":"registry.example.com/owner/app","port":8080}`,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create workload: %v", err)
|
||||
}
|
||||
if _, err := e.store.SetWorkloadVolume(store.WorkloadVolume{
|
||||
WorkloadID: w.ID, Target: "/data", Source: "data", Scope: "project",
|
||||
}); err != nil {
|
||||
t.Fatalf("set volume: %v", err)
|
||||
}
|
||||
|
||||
// Materialize the resolved host-bind dir with a file so there is data to
|
||||
// capture. Layout mirrors ResolveWorkloadPath for project scope:
|
||||
// <baseVol>/<name>-<id8>/<source>.
|
||||
id8 := w.ID
|
||||
if len(id8) > 8 {
|
||||
id8 = id8[:8]
|
||||
}
|
||||
hostDir := filepath.Join(baseVol, "data-app-"+id8, "data")
|
||||
if err := os.MkdirAll(hostDir, 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(hostDir, "payload.txt"), []byte("important"), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// snapshotable lists the one host-bind volume.
|
||||
resp := e.do(t, http.MethodGet, "/api/workloads/"+w.ID+"/snapshotable", nil)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Fatalf("snapshotable status = %d", resp.StatusCode)
|
||||
}
|
||||
var snapable struct {
|
||||
Volumes []map[string]string `json:"volumes"`
|
||||
Skipped []map[string]string `json:"skipped"`
|
||||
}
|
||||
decodeEnvelope(t, resp, &snapable)
|
||||
if len(snapable.Volumes) != 1 || snapable.Volumes[0]["target"] != "/data" {
|
||||
t.Fatalf("expected 1 snapshotable volume /data, got %+v", snapable)
|
||||
}
|
||||
|
||||
// Create a snapshot.
|
||||
resp = e.do(t, http.MethodPost, "/api/workloads/"+w.ID+"/snapshots", map[string]string{"label": "before upgrade"})
|
||||
if resp.StatusCode != http.StatusCreated {
|
||||
t.Fatalf("create snapshot status = %d", resp.StatusCode)
|
||||
}
|
||||
var snap store.VolumeSnapshot
|
||||
decodeEnvelope(t, resp, &snap)
|
||||
if snap.ID == "" || snap.SizeBytes == 0 || snap.Label != "before upgrade" {
|
||||
t.Fatalf("unexpected snapshot: %+v", snap)
|
||||
}
|
||||
|
||||
// It appears in the list.
|
||||
resp = e.do(t, http.MethodGet, "/api/workloads/"+w.ID+"/snapshots", nil)
|
||||
var list []store.VolumeSnapshot
|
||||
decodeEnvelope(t, resp, &list)
|
||||
if len(list) != 1 || list[0].ID != snap.ID {
|
||||
t.Fatalf("expected 1 snapshot in list, got %+v", list)
|
||||
}
|
||||
|
||||
// Download streams a non-empty gzip archive (not the JSON envelope).
|
||||
resp = e.do(t, http.MethodGet, "/api/snapshots/"+snap.ID+"/download", nil)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Fatalf("download status = %d", resp.StatusCode)
|
||||
}
|
||||
if ct := resp.Header.Get("Content-Type"); ct != "application/gzip" {
|
||||
t.Errorf("download content-type = %q, want application/gzip", ct)
|
||||
}
|
||||
data, _ := io.ReadAll(resp.Body)
|
||||
resp.Body.Close()
|
||||
if len(data) == 0 {
|
||||
t.Error("download body is empty")
|
||||
}
|
||||
|
||||
// Delete removes it.
|
||||
resp = e.do(t, http.MethodDelete, "/api/snapshots/"+snap.ID, nil)
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
t.Fatalf("delete status = %d", resp.StatusCode)
|
||||
}
|
||||
resp = e.do(t, http.MethodGet, "/api/workloads/"+w.ID+"/snapshots", nil)
|
||||
var after []store.VolumeSnapshot
|
||||
decodeEnvelope(t, resp, &after)
|
||||
if len(after) != 0 {
|
||||
t.Fatalf("expected 0 snapshots after delete, got %d", len(after))
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateSnapshot_NoVolumeData_Returns400(t *testing.T) {
|
||||
e, _ := newSnapshotEnv(t)
|
||||
w, err := e.store.CreateWorkload(store.Workload{
|
||||
Name: "no-vol-app",
|
||||
Kind: "project",
|
||||
SourceKind: "image",
|
||||
SourceConfig: `{"image":"x","port":80}`,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("create workload: %v", err)
|
||||
}
|
||||
resp := e.do(t, http.MethodPost, "/api/workloads/"+w.ID+"/snapshots", nil)
|
||||
if resp.StatusCode != http.StatusBadRequest {
|
||||
t.Fatalf("expected 400 for an app with no snapshottable volumes, got %d", resp.StatusCode)
|
||||
}
|
||||
resp.Body.Close()
|
||||
}
|
||||
|
||||
func TestSnapshotEndpoints_RequireWorkload(t *testing.T) {
|
||||
e, _ := newSnapshotEnv(t)
|
||||
// snapshotable on an unknown workload → 404.
|
||||
resp := e.do(t, http.MethodGet, "/api/workloads/does-not-exist/snapshotable", nil)
|
||||
if resp.StatusCode != http.StatusNotFound {
|
||||
t.Fatalf("snapshotable unknown workload = %d, want 404", resp.StatusCode)
|
||||
}
|
||||
resp.Body.Close()
|
||||
}
|
||||
@@ -91,6 +91,21 @@ type Backup struct {
|
||||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
|
||||
// VolumeSnapshot is one captured archive of a workload's host-bind data
|
||||
// volumes. Unlike Backup (global, SQLite-specific) it is per-workload and the
|
||||
// archive is a tar.gz of the resolved volume directories. Manifest is a
|
||||
// JSON-encoded []SnapshotVolume describing what the archive covers, so a
|
||||
// future restore can re-resolve each target even if volume settings drift.
|
||||
type VolumeSnapshot struct {
|
||||
ID string `json:"id"`
|
||||
WorkloadID string `json:"workload_id"`
|
||||
Label string `json:"label"`
|
||||
Filename string `json:"filename"`
|
||||
SizeBytes int64 `json:"size_bytes"`
|
||||
Manifest string `json:"manifest"` // JSON []SnapshotVolume
|
||||
CreatedAt string `json:"created_at"`
|
||||
}
|
||||
|
||||
// DNSRecord tracks a DNS record managed by the application.
|
||||
type DNSRecord struct {
|
||||
ID string `json:"id"`
|
||||
@@ -164,11 +179,11 @@ type WorkloadEnv struct {
|
||||
// by image cfg.Env and workload_env).
|
||||
type SharedSecret struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"` // the env KEY
|
||||
Value string `json:"value"` // ciphertext when Encrypted; never returned decrypted by the API
|
||||
Name string `json:"name"` // the env KEY
|
||||
Value string `json:"value"` // ciphertext when Encrypted; never returned decrypted by the API
|
||||
Encrypted bool `json:"encrypted"`
|
||||
Scope string `json:"scope"` // global | app
|
||||
AppID string `json:"app_id"` // set when scope == app; "" for global
|
||||
Scope string `json:"scope"` // global | app
|
||||
AppID string `json:"app_id"` // set when scope == app; "" for global
|
||||
Description string `json:"description"`
|
||||
Enabled bool `json:"enabled"`
|
||||
CreatedAt string `json:"created_at"`
|
||||
|
||||
@@ -284,6 +284,20 @@ func (s *Store) runMigrations() error {
|
||||
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
|
||||
UNIQUE(workload_id, target)
|
||||
)`,
|
||||
// volume_snapshots: per-workload archives of host-bind data
|
||||
// volumes (tar.gz). Mirrors the backups table shape but scoped to a
|
||||
// workload and self-describing via the manifest column so a restore
|
||||
// can re-resolve each target. ON DELETE CASCADE so deleting an app
|
||||
// drops its snapshot rows (the files are pruned separately).
|
||||
`CREATE TABLE IF NOT EXISTS volume_snapshots (
|
||||
id TEXT PRIMARY KEY,
|
||||
workload_id TEXT NOT NULL REFERENCES workloads(id) ON DELETE CASCADE,
|
||||
label TEXT NOT NULL DEFAULT '',
|
||||
filename TEXT NOT NULL,
|
||||
size_bytes INTEGER NOT NULL DEFAULT 0,
|
||||
manifest TEXT NOT NULL DEFAULT '[]',
|
||||
created_at TEXT NOT NULL DEFAULT (datetime('now'))
|
||||
)`,
|
||||
// triggers: first-class redeploy signal sources. Webhook secrets
|
||||
// move from workload onto the trigger so one webhook URL can fan
|
||||
// out to multiple workloads via workload_trigger_bindings.
|
||||
@@ -493,6 +507,7 @@ func (s *Store) runMigrations() error {
|
||||
`CREATE INDEX IF NOT EXISTS idx_containers_stage_id ON containers(stage_id) WHERE stage_id != ''`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_workload_env_workload ON workload_env(workload_id)`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_workload_volumes_workload ON workload_volumes(workload_id)`,
|
||||
`CREATE INDEX IF NOT EXISTS idx_volume_snapshots_workload ON volume_snapshots(workload_id)`,
|
||||
// Trigger-split indexes.
|
||||
`CREATE INDEX IF NOT EXISTS idx_triggers_kind ON triggers(kind)`,
|
||||
`CREATE UNIQUE INDEX IF NOT EXISTS idx_triggers_webhook_secret ON triggers(webhook_secret) WHERE webhook_secret != ''`,
|
||||
|
||||
@@ -0,0 +1,146 @@
|
||||
package store
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"github.com/google/uuid"
|
||||
)
|
||||
|
||||
// CreateVolumeSnapshot inserts a snapshot metadata record. ID is generated
|
||||
// when empty; CreatedAt is stamped server-side.
|
||||
func (s *Store) CreateVolumeSnapshot(v VolumeSnapshot) (VolumeSnapshot, error) {
|
||||
if v.WorkloadID == "" || v.Filename == "" {
|
||||
return VolumeSnapshot{}, fmt.Errorf("volume_snapshot: workload_id and filename are required")
|
||||
}
|
||||
if v.ID == "" {
|
||||
v.ID = uuid.New().String()
|
||||
}
|
||||
if v.Manifest == "" {
|
||||
v.Manifest = "[]"
|
||||
}
|
||||
v.CreatedAt = Now()
|
||||
|
||||
if _, err := s.db.Exec(
|
||||
`INSERT INTO volume_snapshots (id, workload_id, label, filename, size_bytes, manifest, created_at)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)`,
|
||||
v.ID, v.WorkloadID, v.Label, v.Filename, v.SizeBytes, v.Manifest, v.CreatedAt,
|
||||
); err != nil {
|
||||
return VolumeSnapshot{}, fmt.Errorf("insert volume snapshot: %w", err)
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
|
||||
// GetVolumeSnapshot returns one snapshot by ID.
|
||||
func (s *Store) GetVolumeSnapshot(id string) (VolumeSnapshot, error) {
|
||||
var v VolumeSnapshot
|
||||
err := s.db.QueryRow(
|
||||
`SELECT id, workload_id, label, filename, size_bytes, manifest, created_at
|
||||
FROM volume_snapshots WHERE id = ?`, id,
|
||||
).Scan(&v.ID, &v.WorkloadID, &v.Label, &v.Filename, &v.SizeBytes, &v.Manifest, &v.CreatedAt)
|
||||
if errors.Is(err, sql.ErrNoRows) {
|
||||
return VolumeSnapshot{}, fmt.Errorf("volume snapshot %s: %w", id, ErrNotFound)
|
||||
}
|
||||
if err != nil {
|
||||
return VolumeSnapshot{}, fmt.Errorf("query volume snapshot: %w", err)
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
|
||||
// ListVolumeSnapshots returns a workload's snapshots, newest first.
|
||||
func (s *Store) ListVolumeSnapshots(workloadID string) ([]VolumeSnapshot, error) {
|
||||
rows, err := s.db.Query(
|
||||
`SELECT id, workload_id, label, filename, size_bytes, manifest, created_at
|
||||
FROM volume_snapshots WHERE workload_id = ? ORDER BY created_at DESC`, workloadID,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query volume snapshots: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := []VolumeSnapshot{}
|
||||
for rows.Next() {
|
||||
v, err := scanVolumeSnapshot(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, v)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// DeleteVolumeSnapshot removes one snapshot row by ID.
|
||||
func (s *Store) DeleteVolumeSnapshot(id string) error {
|
||||
result, err := s.db.Exec(`DELETE FROM volume_snapshots WHERE id = ?`, id)
|
||||
if err != nil {
|
||||
return fmt.Errorf("delete volume snapshot: %w", err)
|
||||
}
|
||||
if n, _ := result.RowsAffected(); n == 0 {
|
||||
return fmt.Errorf("volume snapshot %s: %w", id, ErrNotFound)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// CountVolumeSnapshots returns how many snapshots a workload has.
|
||||
func (s *Store) CountVolumeSnapshots(workloadID string) (int, error) {
|
||||
var n int
|
||||
if err := s.db.QueryRow(
|
||||
`SELECT COUNT(*) FROM volume_snapshots WHERE workload_id = ?`, workloadID,
|
||||
).Scan(&n); err != nil {
|
||||
return 0, fmt.Errorf("count volume snapshots: %w", err)
|
||||
}
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// GetOldestVolumeSnapshots returns the N oldest snapshots for a workload, for
|
||||
// retention pruning.
|
||||
func (s *Store) GetOldestVolumeSnapshots(workloadID string, limit int) ([]VolumeSnapshot, error) {
|
||||
rows, err := s.db.Query(
|
||||
`SELECT id, workload_id, label, filename, size_bytes, manifest, created_at
|
||||
FROM volume_snapshots WHERE workload_id = ? ORDER BY created_at ASC LIMIT ?`, workloadID, limit,
|
||||
)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query oldest volume snapshots: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := []VolumeSnapshot{}
|
||||
for rows.Next() {
|
||||
v, err := scanVolumeSnapshot(rows)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out = append(out, v)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
// AllVolumeSnapshotFilenames returns every snapshot archive filename across all
|
||||
// workloads, for orphan-file reconciliation at startup.
|
||||
func (s *Store) AllVolumeSnapshotFilenames() ([]string, error) {
|
||||
rows, err := s.db.Query(`SELECT filename FROM volume_snapshots`)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query snapshot filenames: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
out := []string{}
|
||||
for rows.Next() {
|
||||
var name string
|
||||
if err := rows.Scan(&name); err != nil {
|
||||
return nil, fmt.Errorf("scan snapshot filename: %w", err)
|
||||
}
|
||||
out = append(out, name)
|
||||
}
|
||||
return out, rows.Err()
|
||||
}
|
||||
|
||||
func scanVolumeSnapshot(rows *sql.Rows) (VolumeSnapshot, error) {
|
||||
var v VolumeSnapshot
|
||||
if err := rows.Scan(&v.ID, &v.WorkloadID, &v.Label, &v.Filename,
|
||||
&v.SizeBytes, &v.Manifest, &v.CreatedAt); err != nil {
|
||||
return VolumeSnapshot{}, fmt.Errorf("scan volume snapshot: %w", err)
|
||||
}
|
||||
return v, nil
|
||||
}
|
||||
@@ -0,0 +1,140 @@
|
||||
package volsnap
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"compress/gzip"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
// writeArchive serializes the given host-bind volume directories into a
|
||||
// gzip-compressed tar at dest. Each volume's files live under an integer
|
||||
// subdirectory (its manifest Index); a manifest.json at the archive root makes
|
||||
// the archive self-describing. Returns the manifest describing what was
|
||||
// captured.
|
||||
//
|
||||
// Only regular files and directories are archived. Symlinks and special files
|
||||
// (devices, sockets, fifos) are skipped — this keeps capture safe and avoids
|
||||
// recording links whose targets would be meaningless or escape the volume on a
|
||||
// later restore. A torn snapshot is possible if the app writes during capture;
|
||||
// callers should surface that caveat.
|
||||
func writeArchive(dest string, refs []VolumeRef) ([]SnapshotVolume, error) {
|
||||
// O_EXCL: never clobber an existing file (filenames are unique per call).
|
||||
f, err := os.OpenFile(dest, os.O_CREATE|os.O_EXCL|os.O_WRONLY, 0o600)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("create snapshot file: %w", err)
|
||||
}
|
||||
defer f.Close()
|
||||
|
||||
gz := gzip.NewWriter(f)
|
||||
tw := tar.NewWriter(gz)
|
||||
|
||||
manifest := make([]SnapshotVolume, 0, len(refs))
|
||||
for i, ref := range refs {
|
||||
manifest = append(manifest, SnapshotVolume{Index: i, Target: ref.Target, Scope: ref.Scope, Source: ref.Source})
|
||||
if err := addDir(tw, ref.HostPath, fmt.Sprintf("%d", i)); err != nil {
|
||||
_ = tw.Close()
|
||||
_ = gz.Close()
|
||||
_ = f.Close()
|
||||
os.Remove(dest)
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
if err := writeManifestEntry(tw, manifest); err != nil {
|
||||
_ = tw.Close()
|
||||
_ = gz.Close()
|
||||
os.Remove(dest)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if err := tw.Close(); err != nil {
|
||||
_ = gz.Close()
|
||||
os.Remove(dest)
|
||||
return nil, fmt.Errorf("finalize tar: %w", err)
|
||||
}
|
||||
if err := gz.Close(); err != nil {
|
||||
os.Remove(dest)
|
||||
return nil, fmt.Errorf("finalize gzip: %w", err)
|
||||
}
|
||||
if err := f.Close(); err != nil {
|
||||
os.Remove(dest)
|
||||
return nil, fmt.Errorf("close snapshot file: %w", err)
|
||||
}
|
||||
return manifest, nil
|
||||
}
|
||||
|
||||
// addDir walks root and writes its regular files and directories into tw under
|
||||
// the given archive prefix.
|
||||
func addDir(tw *tar.Writer, root, prefix string) error {
|
||||
return filepath.WalkDir(root, func(p string, d fs.DirEntry, walkErr error) error {
|
||||
if walkErr != nil {
|
||||
return fmt.Errorf("walk %s: %w", p, walkErr)
|
||||
}
|
||||
// Skip symlinks and special files; archive only dirs and regular files.
|
||||
if d.Type()&fs.ModeSymlink != 0 {
|
||||
return nil
|
||||
}
|
||||
if !d.IsDir() && !d.Type().IsRegular() {
|
||||
return nil
|
||||
}
|
||||
|
||||
rel, err := filepath.Rel(root, p)
|
||||
if err != nil {
|
||||
return fmt.Errorf("relativize %s: %w", p, err)
|
||||
}
|
||||
name := prefix
|
||||
if rel != "." {
|
||||
name = path.Join(prefix, filepath.ToSlash(rel))
|
||||
}
|
||||
|
||||
info, err := d.Info()
|
||||
if err != nil {
|
||||
return fmt.Errorf("stat %s: %w", p, err)
|
||||
}
|
||||
hdr, err := tar.FileInfoHeader(info, "")
|
||||
if err != nil {
|
||||
return fmt.Errorf("tar header %s: %w", p, err)
|
||||
}
|
||||
hdr.Name = name
|
||||
if d.IsDir() {
|
||||
hdr.Name += "/"
|
||||
}
|
||||
if err := tw.WriteHeader(hdr); err != nil {
|
||||
return fmt.Errorf("write tar header %s: %w", name, err)
|
||||
}
|
||||
if d.IsDir() {
|
||||
return nil
|
||||
}
|
||||
|
||||
src, err := os.Open(p)
|
||||
if err != nil {
|
||||
return fmt.Errorf("open %s: %w", p, err)
|
||||
}
|
||||
defer src.Close()
|
||||
if _, err := io.Copy(tw, src); err != nil {
|
||||
return fmt.Errorf("copy %s: %w", p, err)
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
func writeManifestEntry(tw *tar.Writer, manifest []SnapshotVolume) error {
|
||||
data, err := json.MarshalIndent(manifest, "", " ")
|
||||
if err != nil {
|
||||
return fmt.Errorf("encode manifest: %w", err)
|
||||
}
|
||||
hdr := &tar.Header{Name: "manifest.json", Mode: 0o600, Size: int64(len(data)), Typeflag: tar.TypeReg}
|
||||
if err := tw.WriteHeader(hdr); err != nil {
|
||||
return fmt.Errorf("write manifest header: %w", err)
|
||||
}
|
||||
if _, err := tw.Write(data); err != nil {
|
||||
return fmt.Errorf("write manifest: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -0,0 +1,117 @@
|
||||
package volsnap
|
||||
|
||||
import (
|
||||
"archive/tar"
|
||||
"compress/gzip"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestWriteArchiveRoundTrip(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
mustWrite(t, filepath.Join(root, "a.txt"), "hello")
|
||||
if err := os.MkdirAll(filepath.Join(root, "sub"), 0o755); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
mustWrite(t, filepath.Join(root, "sub", "b.txt"), "world")
|
||||
|
||||
dest := filepath.Join(t.TempDir(), "snap.tar.gz")
|
||||
refs := []VolumeRef{{Target: "/data", Scope: "project", Source: "data", HostPath: root}}
|
||||
|
||||
manifest, err := writeArchive(dest, refs)
|
||||
if err != nil {
|
||||
t.Fatalf("writeArchive: %v", err)
|
||||
}
|
||||
if len(manifest) != 1 || manifest[0].Index != 0 || manifest[0].Target != "/data" || manifest[0].Scope != "project" {
|
||||
t.Fatalf("unexpected manifest: %+v", manifest)
|
||||
}
|
||||
|
||||
entries := readArchive(t, dest)
|
||||
for _, want := range []string{"0/a.txt", "0/sub/b.txt", "manifest.json"} {
|
||||
if _, ok := entries[want]; !ok {
|
||||
keys := make([]string, 0, len(entries))
|
||||
for k := range entries {
|
||||
keys = append(keys, k)
|
||||
}
|
||||
t.Fatalf("archive missing %q; got %v", want, keys)
|
||||
}
|
||||
}
|
||||
if got := entries["0/a.txt"]; got != "hello" {
|
||||
t.Errorf("0/a.txt = %q, want %q", got, "hello")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteArchiveRefusesExisting(t *testing.T) {
|
||||
dest := filepath.Join(t.TempDir(), "snap.tar.gz")
|
||||
mustWrite(t, dest, "existing")
|
||||
if _, err := writeArchive(dest, nil); err == nil {
|
||||
t.Fatal("expected error writing over an existing file (O_EXCL)")
|
||||
}
|
||||
}
|
||||
|
||||
func TestWriteArchiveSkipsSymlinks(t *testing.T) {
|
||||
root := t.TempDir()
|
||||
mustWrite(t, filepath.Join(root, "real.txt"), "data")
|
||||
if err := os.Symlink(filepath.Join(root, "real.txt"), filepath.Join(root, "link.txt")); err != nil {
|
||||
t.Skipf("symlinks unavailable on this platform: %v", err)
|
||||
}
|
||||
|
||||
dest := filepath.Join(t.TempDir(), "snap.tar.gz")
|
||||
if _, err := writeArchive(dest, []VolumeRef{{Target: "/d", Scope: "project", HostPath: root}}); err != nil {
|
||||
t.Fatalf("writeArchive: %v", err)
|
||||
}
|
||||
entries := readArchive(t, dest)
|
||||
if _, ok := entries["0/link.txt"]; ok {
|
||||
t.Error("symlink should have been skipped, but it is in the archive")
|
||||
}
|
||||
if _, ok := entries["0/real.txt"]; !ok {
|
||||
t.Error("regular file should be archived")
|
||||
}
|
||||
}
|
||||
|
||||
func mustWrite(t *testing.T, path, content string) {
|
||||
t.Helper()
|
||||
if err := os.WriteFile(path, []byte(content), 0o644); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
// readArchive returns a map of regular-file entry name -> content. Directory
|
||||
// entries are recorded with an empty string so their presence can be asserted.
|
||||
func readArchive(t *testing.T, path string) map[string]string {
|
||||
t.Helper()
|
||||
f, err := os.Open(path)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer f.Close()
|
||||
gz, err := gzip.NewReader(f)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
defer gz.Close()
|
||||
|
||||
out := map[string]string{}
|
||||
tr := tar.NewReader(gz)
|
||||
for {
|
||||
hdr, err := tr.Next()
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if hdr.Typeflag == tar.TypeDir {
|
||||
out[hdr.Name] = ""
|
||||
continue
|
||||
}
|
||||
data, err := io.ReadAll(tr)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
out[hdr.Name] = string(data)
|
||||
}
|
||||
return out
|
||||
}
|
||||
@@ -0,0 +1,207 @@
|
||||
package volsnap
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
"github.com/alexei/tinyforge/internal/store"
|
||||
)
|
||||
|
||||
// maxSnapshotsPerWorkload caps how many snapshots are retained per app. On
|
||||
// create, older snapshots beyond this count are pruned (best-effort) so volume
|
||||
// snapshots cannot grow the data disk without bound.
|
||||
const maxSnapshotsPerWorkload = 20
|
||||
|
||||
// ErrNoSnapshotData is returned by Create when the workload has no resolved
|
||||
// host-bind volume directory to capture. It is a client-actionable condition
|
||||
// (HTTP 400), distinct from internal failures (HTTP 500).
|
||||
var ErrNoSnapshotData = errors.New("no snapshottable volume data for this app")
|
||||
|
||||
// Engine creates and manages volume snapshots under <dataDir>/snapshots.
|
||||
type Engine struct {
|
||||
mu sync.Mutex
|
||||
store *store.Store
|
||||
snapDir string
|
||||
}
|
||||
|
||||
// New creates the snapshot engine, ensuring the snapshot directory exists.
|
||||
func New(st *store.Store, dataDir string) (*Engine, error) {
|
||||
dir := filepath.Join(dataDir, "snapshots")
|
||||
if err := os.MkdirAll(dir, 0o755); err != nil {
|
||||
return nil, fmt.Errorf("create snapshot directory: %w", err)
|
||||
}
|
||||
return &Engine{store: st, snapDir: dir}, nil
|
||||
}
|
||||
|
||||
// SnapDir returns the directory holding snapshot archives.
|
||||
func (e *Engine) SnapDir() string { return e.snapDir }
|
||||
|
||||
// Create captures a snapshot of the workload's host-bind data volumes.
|
||||
func (e *Engine) Create(w store.Workload, settings store.Settings, label string) (store.VolumeSnapshot, error) {
|
||||
refs, _, err := SnapshotableVolumes(e.store, w, settings)
|
||||
if err != nil {
|
||||
return store.VolumeSnapshot{}, fmt.Errorf("enumerate volumes: %w", err)
|
||||
}
|
||||
if len(refs) == 0 {
|
||||
return store.VolumeSnapshot{}, ErrNoSnapshotData
|
||||
}
|
||||
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
filename := fmt.Sprintf("%s-%s-%s.tar.gz",
|
||||
idShort(w.ID), time.Now().UTC().Format("20060102-150405"), uuid.New().String()[:8])
|
||||
dest := filepath.Join(e.snapDir, filename)
|
||||
|
||||
manifest, err := writeArchive(dest, refs)
|
||||
if err != nil {
|
||||
return store.VolumeSnapshot{}, err
|
||||
}
|
||||
|
||||
info, err := os.Stat(dest)
|
||||
if err != nil {
|
||||
os.Remove(dest)
|
||||
return store.VolumeSnapshot{}, fmt.Errorf("stat snapshot: %w", err)
|
||||
}
|
||||
manifestJSON, err := json.Marshal(manifest)
|
||||
if err != nil {
|
||||
os.Remove(dest)
|
||||
return store.VolumeSnapshot{}, fmt.Errorf("encode manifest: %w", err)
|
||||
}
|
||||
|
||||
row, err := e.store.CreateVolumeSnapshot(store.VolumeSnapshot{
|
||||
WorkloadID: w.ID,
|
||||
Label: strings.TrimSpace(label),
|
||||
Filename: filename,
|
||||
SizeBytes: info.Size(),
|
||||
Manifest: string(manifestJSON),
|
||||
})
|
||||
if err != nil {
|
||||
os.Remove(dest) // best-effort: don't leak an orphan file
|
||||
return store.VolumeSnapshot{}, fmt.Errorf("record snapshot: %w", err)
|
||||
}
|
||||
|
||||
slog.Info("volume snapshot created", "id", row.ID, "workload", w.ID,
|
||||
"volumes", len(manifest), "size", info.Size())
|
||||
|
||||
e.pruneWorkload(w.ID)
|
||||
return row, nil
|
||||
}
|
||||
|
||||
// List returns a workload's snapshots, newest first.
|
||||
func (e *Engine) List(workloadID string) ([]store.VolumeSnapshot, error) {
|
||||
return e.store.ListVolumeSnapshots(workloadID)
|
||||
}
|
||||
|
||||
// Get returns one snapshot by id.
|
||||
func (e *Engine) Get(id string) (store.VolumeSnapshot, error) {
|
||||
return e.store.GetVolumeSnapshot(id)
|
||||
}
|
||||
|
||||
// Delete removes a snapshot's archive file and its metadata row.
|
||||
func (e *Engine) Delete(id string) error {
|
||||
snap, err := e.store.GetVolumeSnapshot(id)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
if p, perr := e.FilePath(snap); perr == nil {
|
||||
if rmErr := os.Remove(p); rmErr != nil && !os.IsNotExist(rmErr) {
|
||||
slog.Warn("volume snapshot: remove file", "id", id, "error", rmErr)
|
||||
}
|
||||
}
|
||||
return e.store.DeleteVolumeSnapshot(id)
|
||||
}
|
||||
|
||||
// FilePath resolves a snapshot's archive path and verifies it stays within the
|
||||
// snapshot directory (defence-in-depth against a tampered filename column).
|
||||
func (e *Engine) FilePath(snap store.VolumeSnapshot) (string, error) {
|
||||
base := filepath.Base(snap.Filename)
|
||||
if base == "" || base == "." || base != snap.Filename {
|
||||
return "", fmt.Errorf("invalid snapshot filename")
|
||||
}
|
||||
p := filepath.Join(e.snapDir, base)
|
||||
abs, err := filepath.Abs(p)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
absDir, _ := filepath.Abs(e.snapDir)
|
||||
if !strings.HasPrefix(abs, absDir+string(filepath.Separator)) {
|
||||
return "", fmt.Errorf("snapshot path escapes snapshot directory")
|
||||
}
|
||||
return abs, nil
|
||||
}
|
||||
|
||||
// CleanOrphans removes snapshot archive files that have no metadata row,
|
||||
// reconciling on-disk files against the DB. Workload deletion CASCADEs the
|
||||
// volume_snapshots rows but cannot reach the files; this (run at startup)
|
||||
// reclaims them. Mirrors backup.Engine.CleanOrphans.
|
||||
func (e *Engine) CleanOrphans() (int, error) {
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
|
||||
entries, err := os.ReadDir(e.snapDir)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("read snapshot dir: %w", err)
|
||||
}
|
||||
filenames, err := e.store.AllVolumeSnapshotFilenames()
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("list snapshot filenames: %w", err)
|
||||
}
|
||||
known := make(map[string]bool, len(filenames))
|
||||
for _, f := range filenames {
|
||||
known[f] = true
|
||||
}
|
||||
|
||||
removed := 0
|
||||
for _, ent := range entries {
|
||||
if ent.IsDir() || known[ent.Name()] {
|
||||
continue
|
||||
}
|
||||
if err := os.Remove(filepath.Join(e.snapDir, ent.Name())); err != nil {
|
||||
slog.Warn("volume snapshot: remove orphan", "file", ent.Name(), "error", err)
|
||||
continue
|
||||
}
|
||||
removed++
|
||||
}
|
||||
return removed, nil
|
||||
}
|
||||
|
||||
// pruneWorkload deletes snapshots beyond maxSnapshotsPerWorkload for one
|
||||
// workload (oldest first). Best-effort: caller already holds e.mu.
|
||||
func (e *Engine) pruneWorkload(workloadID string) {
|
||||
count, err := e.store.CountVolumeSnapshots(workloadID)
|
||||
if err != nil || count <= maxSnapshotsPerWorkload {
|
||||
return
|
||||
}
|
||||
oldest, err := e.store.GetOldestVolumeSnapshots(workloadID, count-maxSnapshotsPerWorkload)
|
||||
if err != nil {
|
||||
slog.Warn("volume snapshot: prune query", "workload", workloadID, "error", err)
|
||||
return
|
||||
}
|
||||
for _, snap := range oldest {
|
||||
if p, perr := e.FilePath(snap); perr == nil {
|
||||
_ = os.Remove(p)
|
||||
}
|
||||
if derr := e.store.DeleteVolumeSnapshot(snap.ID); derr != nil {
|
||||
slog.Warn("volume snapshot: prune delete", "id", snap.ID, "error", derr)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func idShort(id string) string {
|
||||
if len(id) > 8 {
|
||||
return id[:8]
|
||||
}
|
||||
return id
|
||||
}
|
||||
@@ -0,0 +1,146 @@
|
||||
// Package volsnap captures and manages per-workload snapshots of an app's
|
||||
// host-bind data volumes. It is deliberately independent of internal/backup
|
||||
// (which is SQLite-specific): a snapshot here is a tar.gz of the resolved
|
||||
// volume directories, recorded in the volume_snapshots table.
|
||||
//
|
||||
// Phase 2a-i covers CAPTURE only (create/list/delete/download). The restore
|
||||
// path — which overwrites live data and needs container quiesce + atomic swap
|
||||
// — is intentionally a separate, later phase.
|
||||
package volsnap
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"os"
|
||||
|
||||
"github.com/alexei/tinyforge/internal/store"
|
||||
"github.com/alexei/tinyforge/internal/volume"
|
||||
)
|
||||
|
||||
// supportedScopes are the host-bind volume scopes phase 2a-i can snapshot.
|
||||
// Each resolves to a real host directory the running container binds. Excluded
|
||||
// for now: instance (needs the deployed image tag to resolve a per-tag dir),
|
||||
// named/project_named (Docker named volumes — need a docker-run-tar primitive),
|
||||
// and ephemeral (tmpfs — no data to capture).
|
||||
var supportedScopes = map[string]bool{
|
||||
string(store.VolumeScopeAbsolute): true,
|
||||
string(store.VolumeScopeStage): true,
|
||||
string(store.VolumeScopeProject): true,
|
||||
}
|
||||
|
||||
// SnapshotVolume is one volume covered by a snapshot. It is persisted in the
|
||||
// snapshot row's manifest (JSON) and written into the archive so a future
|
||||
// restore can re-resolve the target even if volume settings drift. Index names
|
||||
// the archive subdirectory holding that volume's files.
|
||||
type SnapshotVolume struct {
|
||||
Index int `json:"index"`
|
||||
Target string `json:"target"`
|
||||
Scope string `json:"scope"`
|
||||
Source string `json:"source"`
|
||||
}
|
||||
|
||||
// VolumeRef is a resolved, on-disk host-bind volume eligible for snapshotting.
|
||||
type VolumeRef struct {
|
||||
Target string
|
||||
Scope string
|
||||
Source string
|
||||
HostPath string
|
||||
}
|
||||
|
||||
// SkippedVolume is a declared volume that cannot be snapshotted, with the
|
||||
// reason surfaced to the UI so users are never misled into thinking data is
|
||||
// captured when it is not.
|
||||
type SkippedVolume struct {
|
||||
Target string `json:"target"`
|
||||
Scope string `json:"scope"`
|
||||
Reason string `json:"reason"`
|
||||
}
|
||||
|
||||
// scVolumes is the minimal shape parsed out of an image workload's
|
||||
// source_config — just enough to learn its declared volumes without importing
|
||||
// the image source package.
|
||||
type scVolumes struct {
|
||||
Volumes []struct {
|
||||
Source string `json:"source"`
|
||||
Target string `json:"target"`
|
||||
Scope string `json:"scope"`
|
||||
Name string `json:"name"`
|
||||
} `json:"volumes"`
|
||||
}
|
||||
|
||||
// SnapshotableVolumes enumerates a workload's data volumes and splits them into
|
||||
// those that can be snapshotted now (resolved host-bind dirs that exist on
|
||||
// disk) and those that are skipped (with a reason). It mirrors the image
|
||||
// source's computeMounts merge: source_config volumes overlaid by persisted
|
||||
// workload_volumes rows (persisted wins on a target conflict).
|
||||
//
|
||||
// Only image-source workloads declare host-bind data volumes today; for any
|
||||
// other source kind both slices come back empty.
|
||||
func SnapshotableVolumes(st *store.Store, w store.Workload, settings store.Settings) (refs []VolumeRef, skipped []SkippedVolume, err error) {
|
||||
if w.SourceKind != "image" {
|
||||
return nil, nil, nil
|
||||
}
|
||||
|
||||
byTarget := map[string]store.WorkloadVolume{}
|
||||
|
||||
var cfg scVolumes
|
||||
if w.SourceConfig != "" {
|
||||
// Best-effort: a malformed config simply yields no inline volumes; the
|
||||
// persisted rows below still apply.
|
||||
_ = json.Unmarshal([]byte(w.SourceConfig), &cfg)
|
||||
}
|
||||
for _, v := range cfg.Volumes {
|
||||
if v.Target == "" {
|
||||
continue
|
||||
}
|
||||
byTarget[v.Target] = store.WorkloadVolume{Source: v.Source, Target: v.Target, Scope: v.Scope, Name: v.Name}
|
||||
}
|
||||
persisted, perr := st.ListWorkloadVolumes(w.ID)
|
||||
if perr != nil {
|
||||
return nil, nil, perr
|
||||
}
|
||||
for _, p := range persisted {
|
||||
byTarget[p.Target] = store.WorkloadVolume{Source: p.Source, Target: p.Target, Scope: p.Scope, Name: p.Name}
|
||||
}
|
||||
|
||||
params := volume.ResolveWorkloadParams{
|
||||
BasePath: settings.BaseVolumePath,
|
||||
WorkloadID: w.ID,
|
||||
WorkloadName: w.Name,
|
||||
AllowedVolumePaths: settings.AllowedVolumePaths,
|
||||
}
|
||||
|
||||
for _, v := range byTarget {
|
||||
if v.Target == "" {
|
||||
continue
|
||||
}
|
||||
if !supportedScopes[v.Scope] {
|
||||
skipped = append(skipped, SkippedVolume{Target: v.Target, Scope: v.Scope, Reason: skipReason(v.Scope)})
|
||||
continue
|
||||
}
|
||||
hostPath, rerr := volume.ResolveWorkloadPath(v, params)
|
||||
if rerr != nil {
|
||||
skipped = append(skipped, SkippedVolume{Target: v.Target, Scope: v.Scope, Reason: rerr.Error()})
|
||||
continue
|
||||
}
|
||||
info, serr := os.Stat(hostPath)
|
||||
if serr != nil || !info.IsDir() {
|
||||
skipped = append(skipped, SkippedVolume{Target: v.Target, Scope: v.Scope, Reason: "no data on disk yet"})
|
||||
continue
|
||||
}
|
||||
refs = append(refs, VolumeRef{Target: v.Target, Scope: v.Scope, Source: v.Source, HostPath: hostPath})
|
||||
}
|
||||
return refs, skipped, nil
|
||||
}
|
||||
|
||||
func skipReason(scope string) string {
|
||||
switch scope {
|
||||
case string(store.VolumeScopeInstance):
|
||||
return "instance-scoped volumes are not yet snapshottable"
|
||||
case string(store.VolumeScopeNamed), string(store.VolumeScopeProjectNamed):
|
||||
return "Docker named volumes are not yet snapshottable"
|
||||
case string(store.VolumeScopeEphemeral):
|
||||
return "ephemeral (tmpfs) volumes hold no persistent data"
|
||||
default:
|
||||
return "unsupported volume scope"
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user