410a131cec
This session (frontend focus):
- Rebuild /apps/new as a 4-step wizard (Basics → Configure → Trigger → Review):
WizardRail, SourceKindPicker card grid, AppManifest review, per-step validation,
ConfirmDialog-based unsaved-changes guard.
- Extract lib/workload/sourceForms.ts (single source of truth for source_config)
+ {Image,Compose,Static,Dockerfile}SourceForm + StaticDiscoveryWizard; fold the
/apps/[id] edit form onto the same components (removes the duplication). Add
vitest + sourceForms unit tests.
- Branch preview environments UI: /chain is_preview/preview_branch + a Preview
environments panel on /apps/[id] (per-branch URLs, ConfirmDialog teardown, armed
state); RegistryImagePicker on the registry trigger and the image source.
- Fixes: image-inspect 404 -> admin-gated POST /api/discovery/image/inspect;
conflict-panel blur flicker; friendly localized discovery errors; CPU/Memory
label hints; dashboard + /apps "Total workloads" count only source_kind workloads
(drop stale trigger_kind gate); NPM cert/access-list name cache; EntityPicker
empty-list guard.
- Update CLAUDE.md frontend conventions + add a Build & Test section.
Also captures pre-existing in-progress platform work (not from this session):
workload notifications, Prometheus metrics export, store lockfile, health probes,
backup hardening, and related store/webhook/scheduler changes.
474 lines
14 KiB
Go
474 lines
14 KiB
Go
package store
|
|
|
|
import (
|
|
"database/sql"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// validateExtraJSON ensures the extra_json column never receives an
|
|
// invalid JSON document. The codemap (docs/CODEMAPS/container-extra-json.md)
|
|
// is explicit that readers tolerate unknown keys — but only if the value
|
|
// is valid JSON at all. A buggy plugin writing `"not json"` would silently
|
|
// break every reader, with no schema-level check to catch it. Guarding at
|
|
// the store boundary keeps the invariant cheap and obvious.
|
|
func validateExtraJSON(v string) error {
|
|
if v == "" {
|
|
return nil
|
|
}
|
|
if !json.Valid([]byte(v)) {
|
|
return fmt.Errorf("extra_json: not valid JSON (%d bytes)", len(v))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// containerColumns is the canonical column list for `containers` queries.
|
|
// stage_id is populated by the deployer for project containers (so ListProxyRoutes
|
|
// survives stage renames) and left empty for stacks and sites.
|
|
const containerColumns = `id, workload_id, workload_kind, role, stage_id, container_id,
|
|
image_ref, image_tag, host, state, port,
|
|
subdomain, proxy_route_id, npm_proxy_id,
|
|
last_seen_at, extra_json, created_at, updated_at`
|
|
|
|
func scanContainer(scanner interface{ Scan(...any) error }) (Container, error) {
|
|
var c Container
|
|
err := scanner.Scan(
|
|
&c.ID, &c.WorkloadID, &c.WorkloadKind, &c.Role, &c.StageID, &c.ContainerID,
|
|
&c.ImageRef, &c.ImageTag, &c.Host, &c.State, &c.Port,
|
|
&c.Subdomain, &c.ProxyRouteID, &c.NpmProxyID,
|
|
&c.LastSeenAt, &c.ExtraJSON, &c.CreatedAt, &c.UpdatedAt,
|
|
)
|
|
return c, err
|
|
}
|
|
|
|
// CreateContainer inserts a new container row, generating an ID if none provided.
|
|
// Use this when scheduling a new container (before Docker create returns an ID).
|
|
func (s *Store) CreateContainer(c Container) (Container, error) {
|
|
if c.ID == "" {
|
|
c.ID = uuid.New().String()
|
|
}
|
|
if c.Host == "" {
|
|
c.Host = "local"
|
|
}
|
|
c.CreatedAt = Now()
|
|
c.UpdatedAt = c.CreatedAt
|
|
if c.ExtraJSON == "" {
|
|
c.ExtraJSON = "{}"
|
|
}
|
|
if err := validateExtraJSON(c.ExtraJSON); err != nil {
|
|
return Container{}, err
|
|
}
|
|
|
|
_, err := s.db.Exec(
|
|
`INSERT INTO containers (`+containerColumns+`)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
c.ID, c.WorkloadID, c.WorkloadKind, c.Role, c.StageID, c.ContainerID,
|
|
c.ImageRef, c.ImageTag, c.Host, c.State, c.Port,
|
|
c.Subdomain, c.ProxyRouteID, c.NpmProxyID,
|
|
c.LastSeenAt, c.ExtraJSON, c.CreatedAt, c.UpdatedAt,
|
|
)
|
|
if err != nil {
|
|
return Container{}, fmt.Errorf("insert container: %w", err)
|
|
}
|
|
return c, nil
|
|
}
|
|
|
|
// UpsertContainer is the full-write path used by the deployer paths
|
|
// (stack manager, static-site manager) that own all fields of a row. Inserts
|
|
// if missing, replaces every column on conflict. The reconciler must NOT call
|
|
// this — it would clobber deployer-written subdomain / proxy_route_id /
|
|
// npm_proxy_id / image_tag with the empty values it doesn't know about. Use
|
|
// ReconcileContainer instead.
|
|
func (s *Store) UpsertContainer(c Container) error {
|
|
if c.ID == "" {
|
|
return fmt.Errorf("UpsertContainer: ID is required")
|
|
}
|
|
if c.Host == "" {
|
|
c.Host = "local"
|
|
}
|
|
c.UpdatedAt = Now()
|
|
if c.CreatedAt == "" {
|
|
c.CreatedAt = c.UpdatedAt
|
|
}
|
|
if c.ExtraJSON == "" {
|
|
c.ExtraJSON = "{}"
|
|
}
|
|
if err := validateExtraJSON(c.ExtraJSON); err != nil {
|
|
return err
|
|
}
|
|
|
|
// SQLite UPSERT — INSERT...ON CONFLICT(id) DO UPDATE.
|
|
_, err := s.db.Exec(
|
|
`INSERT INTO containers (`+containerColumns+`)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
workload_id=excluded.workload_id,
|
|
workload_kind=excluded.workload_kind,
|
|
role=excluded.role,
|
|
stage_id=excluded.stage_id,
|
|
container_id=excluded.container_id,
|
|
image_ref=excluded.image_ref,
|
|
image_tag=excluded.image_tag,
|
|
host=excluded.host,
|
|
state=excluded.state,
|
|
port=excluded.port,
|
|
subdomain=excluded.subdomain,
|
|
proxy_route_id=excluded.proxy_route_id,
|
|
npm_proxy_id=excluded.npm_proxy_id,
|
|
last_seen_at=excluded.last_seen_at,
|
|
extra_json=excluded.extra_json,
|
|
updated_at=excluded.updated_at`,
|
|
c.ID, c.WorkloadID, c.WorkloadKind, c.Role, c.StageID, c.ContainerID,
|
|
c.ImageRef, c.ImageTag, c.Host, c.State, c.Port,
|
|
c.Subdomain, c.ProxyRouteID, c.NpmProxyID,
|
|
c.LastSeenAt, c.ExtraJSON, c.CreatedAt, c.UpdatedAt,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("upsert container: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ReconcileContainer is the reconciler's write path. INSERTs a new row when
|
|
// none exists (with all label-derived metadata) and on conflict updates ONLY
|
|
// the Docker-derived fields the reconciler can observe — never touching
|
|
// subdomain / proxy_route_id / npm_proxy_id / image_tag / stage_id, which are
|
|
// owned by the deployer paths and would be wiped to empty if included.
|
|
func (s *Store) ReconcileContainer(c Container) error {
|
|
if c.ID == "" {
|
|
return fmt.Errorf("ReconcileContainer: ID is required")
|
|
}
|
|
if c.Host == "" {
|
|
c.Host = "local"
|
|
}
|
|
c.UpdatedAt = Now()
|
|
if c.CreatedAt == "" {
|
|
c.CreatedAt = c.UpdatedAt
|
|
}
|
|
if c.ExtraJSON == "" {
|
|
c.ExtraJSON = "{}"
|
|
}
|
|
if err := validateExtraJSON(c.ExtraJSON); err != nil {
|
|
return err
|
|
}
|
|
|
|
// extra_json is deliberately NOT in the ON CONFLICT SET clause: the
|
|
// reconciler can't observe per-face route IDs from Docker, and
|
|
// stomping the deployer's writes would orphan proxy routes at
|
|
// teardown.
|
|
_, err := s.db.Exec(
|
|
`INSERT INTO containers (`+containerColumns+`)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
|
ON CONFLICT(id) DO UPDATE SET
|
|
container_id=excluded.container_id,
|
|
image_ref=excluded.image_ref,
|
|
state=excluded.state,
|
|
port=excluded.port,
|
|
last_seen_at=excluded.last_seen_at,
|
|
updated_at=excluded.updated_at`,
|
|
c.ID, c.WorkloadID, c.WorkloadKind, c.Role, c.StageID, c.ContainerID,
|
|
c.ImageRef, c.ImageTag, c.Host, c.State, c.Port,
|
|
c.Subdomain, c.ProxyRouteID, c.NpmProxyID,
|
|
c.LastSeenAt, c.ExtraJSON, c.CreatedAt, c.UpdatedAt,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("reconcile container: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetContainerByID returns a single container row.
|
|
func (s *Store) GetContainerByID(id string) (Container, error) {
|
|
c, err := scanContainer(s.db.QueryRow(
|
|
`SELECT `+containerColumns+` FROM containers WHERE id = ?`, id,
|
|
))
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return Container{}, fmt.Errorf("container %s: %w", id, ErrNotFound)
|
|
}
|
|
if err != nil {
|
|
return Container{}, fmt.Errorf("query container: %w", err)
|
|
}
|
|
return c, nil
|
|
}
|
|
|
|
// GetContainerByDockerID looks up a container row by its Docker container ID.
|
|
// The reconciler uses this to decide between insert (new container) and update.
|
|
func (s *Store) GetContainerByDockerID(dockerID string) (Container, error) {
|
|
if dockerID == "" {
|
|
return Container{}, fmt.Errorf("empty container_id: %w", ErrNotFound)
|
|
}
|
|
c, err := scanContainer(s.db.QueryRow(
|
|
`SELECT `+containerColumns+` FROM containers WHERE container_id = ?`, dockerID,
|
|
))
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return Container{}, ErrNotFound
|
|
}
|
|
if err != nil {
|
|
return Container{}, fmt.Errorf("query container by docker id: %w", err)
|
|
}
|
|
return c, nil
|
|
}
|
|
|
|
// ListProxyRoutes returns proxy-enabled containers joined with their
|
|
// owning workload's name. The legacy stages join is gone — Role is used
|
|
// as the StageName fallback so the Proxies page still reads naturally
|
|
// for project-style workloads. Source is reported as "instance" for
|
|
// back-compat with the Proxies page filter (the frontend keys off the
|
|
// literal string).
|
|
func (s *Store) ListProxyRoutes(domain string) ([]ProxyRoute, error) {
|
|
rows, err := s.db.Query(`
|
|
SELECT c.id, w.id, w.name,
|
|
c.image_tag, c.subdomain, c.container_id, c.port,
|
|
c.proxy_route_id, c.npm_proxy_id, c.state, c.created_at,
|
|
c.role, c.stage_id
|
|
FROM containers c
|
|
JOIN workloads w ON w.id = c.workload_id
|
|
WHERE c.subdomain != '' AND (c.proxy_route_id != '' OR c.npm_proxy_id > 0)
|
|
ORDER BY w.name, c.role, c.created_at DESC`,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query proxy routes: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
routes := []ProxyRoute{}
|
|
for rows.Next() {
|
|
var r ProxyRoute
|
|
var role, stageID string
|
|
if err := rows.Scan(
|
|
&r.InstanceID, &r.ProjectID, &r.ProjectName,
|
|
&r.ImageTag, &r.Subdomain, &r.ContainerID, &r.Port,
|
|
&r.ProxyRouteID, &r.NpmProxyID, &r.Status, &r.CreatedAt,
|
|
&role, &stageID,
|
|
); err != nil {
|
|
return nil, fmt.Errorf("scan proxy route: %w", err)
|
|
}
|
|
r.Source = "instance"
|
|
r.StageID = stageID
|
|
r.StageName = role
|
|
if domain != "" && r.Subdomain != "" {
|
|
r.Domain = r.Subdomain + "." + domain
|
|
}
|
|
routes = append(routes, r)
|
|
}
|
|
return routes, rows.Err()
|
|
}
|
|
|
|
// ListContainersByWorkload returns all containers for a given workload, newest first.
|
|
func (s *Store) ListContainersByWorkload(workloadID string) ([]Container, error) {
|
|
rows, err := s.db.Query(
|
|
`SELECT `+containerColumns+` FROM containers
|
|
WHERE workload_id = ? ORDER BY created_at DESC`,
|
|
workloadID,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query containers by workload: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
out := []Container{}
|
|
for rows.Next() {
|
|
c, err := scanContainer(rows)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scan container: %w", err)
|
|
}
|
|
out = append(out, c)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
// ContainerFilter narrows ListContainers. Empty fields mean "no filter".
|
|
type ContainerFilter struct {
|
|
WorkloadID string
|
|
WorkloadKind string
|
|
State string
|
|
AppID string
|
|
}
|
|
|
|
// ListContainers returns all containers matching the filter, newest first.
|
|
// AppID joins through workloads.
|
|
func (s *Store) ListContainers(f ContainerFilter) ([]Container, error) {
|
|
var (
|
|
where []string
|
|
args []any
|
|
)
|
|
needsAppJoin := f.AppID != ""
|
|
if needsAppJoin {
|
|
where = append(where, "w.app_id = ?")
|
|
args = append(args, f.AppID)
|
|
}
|
|
if f.WorkloadID != "" {
|
|
where = append(where, "c.workload_id = ?")
|
|
args = append(args, f.WorkloadID)
|
|
}
|
|
if f.WorkloadKind != "" {
|
|
where = append(where, "c.workload_kind = ?")
|
|
args = append(args, f.WorkloadKind)
|
|
}
|
|
if f.State != "" {
|
|
where = append(where, "c.state = ?")
|
|
args = append(args, f.State)
|
|
}
|
|
|
|
query := `SELECT ` + prefixCols(containerColumns, "c.") + ` FROM containers c`
|
|
if needsAppJoin {
|
|
query += ` JOIN workloads w ON w.id = c.workload_id`
|
|
}
|
|
if len(where) > 0 {
|
|
query += " WHERE " + strings.Join(where, " AND ")
|
|
}
|
|
query += " ORDER BY c.created_at DESC"
|
|
|
|
rows, err := s.db.Query(query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query containers: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
out := []Container{}
|
|
for rows.Next() {
|
|
c, err := scanContainer(rows)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("scan container: %w", err)
|
|
}
|
|
out = append(out, c)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
// UpdateContainer replaces all mutable fields on an existing container row.
|
|
// Use this from the deployer when proxy / subdomain assignments change.
|
|
func (s *Store) UpdateContainer(c Container) error {
|
|
c.UpdatedAt = Now()
|
|
if c.ExtraJSON == "" {
|
|
c.ExtraJSON = "{}"
|
|
}
|
|
if err := validateExtraJSON(c.ExtraJSON); err != nil {
|
|
return err
|
|
}
|
|
result, err := s.db.Exec(
|
|
`UPDATE containers SET workload_id=?, workload_kind=?, role=?, stage_id=?, container_id=?,
|
|
image_ref=?, image_tag=?, host=?, state=?, port=?,
|
|
subdomain=?, proxy_route_id=?, npm_proxy_id=?,
|
|
last_seen_at=?, extra_json=?, updated_at=?
|
|
WHERE id=?`,
|
|
c.WorkloadID, c.WorkloadKind, c.Role, c.StageID, c.ContainerID,
|
|
c.ImageRef, c.ImageTag, c.Host, c.State, c.Port,
|
|
c.Subdomain, c.ProxyRouteID, c.NpmProxyID,
|
|
c.LastSeenAt, c.ExtraJSON, c.UpdatedAt, c.ID,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("update container: %w", err)
|
|
}
|
|
n, err := result.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("rows affected: %w", err)
|
|
}
|
|
if n == 0 {
|
|
return fmt.Errorf("container %s: %w", c.ID, ErrNotFound)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// UpdateContainerState sets only the state and last_seen_at fields.
|
|
// Used by the reconciler for rapid status flips without a full row read.
|
|
func (s *Store) UpdateContainerState(id, state string) error {
|
|
ts := Now()
|
|
result, err := s.db.Exec(
|
|
`UPDATE containers SET state=?, last_seen_at=?, updated_at=? WHERE id=?`,
|
|
state, ts, ts, id,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("update container state: %w", err)
|
|
}
|
|
n, err := result.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("rows affected: %w", err)
|
|
}
|
|
if n == 0 {
|
|
return fmt.Errorf("container %s: %w", id, ErrNotFound)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// MarkContainerMissing flips state to 'missing' for a container that the
|
|
// reconciler can no longer find in `docker ps`. Idempotent.
|
|
func (s *Store) MarkContainerMissing(id string) error {
|
|
return s.UpdateContainerState(id, "missing")
|
|
}
|
|
|
|
// DeleteContainer removes a container row. Use when the underlying Docker
|
|
// container has been deleted and we want to forget it entirely (vs. marking missing).
|
|
func (s *Store) DeleteContainer(id string) error {
|
|
result, err := s.db.Exec(`DELETE FROM containers WHERE id = ?`, id)
|
|
if err != nil {
|
|
return fmt.Errorf("delete container: %w", err)
|
|
}
|
|
n, err := result.RowsAffected()
|
|
if err != nil {
|
|
return fmt.Errorf("rows affected: %w", err)
|
|
}
|
|
if n == 0 {
|
|
return fmt.Errorf("container %s: %w", id, ErrNotFound)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// DeleteContainersByWorkload removes every container row for a workload.
|
|
// Used when a workload is deleted and we want to drop its container index entries.
|
|
func (s *Store) DeleteContainersByWorkload(workloadID string) error {
|
|
_, err := s.db.Exec(`DELETE FROM containers WHERE workload_id = ?`, workloadID)
|
|
if err != nil {
|
|
return fmt.Errorf("delete containers by workload: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ListMissingSweepRows returns rows the reconciler must consider for the
|
|
// missing-state sweep — bound to a real Docker container and not already
|
|
// flipped to 'missing'. Used in place of a full ListContainers scan to keep
|
|
// the per-tick query proportional to the live set.
|
|
func (s *Store) ListMissingSweepRows() ([]struct {
|
|
ID string
|
|
ContainerID string
|
|
}, error) {
|
|
rows, err := s.db.Query(
|
|
`SELECT id, container_id FROM containers
|
|
WHERE container_id != '' AND state != 'missing'`,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query missing-sweep rows: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
out := []struct {
|
|
ID string
|
|
ContainerID string
|
|
}{}
|
|
for rows.Next() {
|
|
var r struct {
|
|
ID string
|
|
ContainerID string
|
|
}
|
|
if err := rows.Scan(&r.ID, &r.ContainerID); err != nil {
|
|
return nil, fmt.Errorf("scan missing-sweep row: %w", err)
|
|
}
|
|
out = append(out, r)
|
|
}
|
|
return out, rows.Err()
|
|
}
|
|
|
|
// prefixCols rewrites a comma-separated column list to use a table alias prefix.
|
|
// Used by ListContainers when joining containers (alias `c`) to workloads.
|
|
func prefixCols(cols, prefix string) string {
|
|
parts := strings.Split(cols, ",")
|
|
for i, p := range parts {
|
|
parts[i] = prefix + strings.TrimSpace(p)
|
|
}
|
|
return strings.Join(parts, ", ")
|
|
}
|