package store import ( "database/sql" "errors" "fmt" "strings" "github.com/google/uuid" ) const containerColumns = `id, workload_id, workload_kind, role, container_id, image_ref, image_tag, host, state, port, subdomain, proxy_route_id, npm_proxy_id, last_seen_at, extra_json, created_at, updated_at` func scanContainer(scanner interface{ Scan(...any) error }) (Container, error) { var c Container err := scanner.Scan( &c.ID, &c.WorkloadID, &c.WorkloadKind, &c.Role, &c.ContainerID, &c.ImageRef, &c.ImageTag, &c.Host, &c.State, &c.Port, &c.Subdomain, &c.ProxyRouteID, &c.NpmProxyID, &c.LastSeenAt, &c.ExtraJSON, &c.CreatedAt, &c.UpdatedAt, ) return c, err } // CreateContainer inserts a new container row, generating an ID if none provided. // Use this when scheduling a new container (before Docker create returns an ID). func (s *Store) CreateContainer(c Container) (Container, error) { if c.ID == "" { c.ID = uuid.New().String() } if c.Host == "" { c.Host = "local" } if c.ExtraJSON == "" { c.ExtraJSON = "{}" } c.CreatedAt = Now() c.UpdatedAt = c.CreatedAt _, err := s.db.Exec( `INSERT INTO containers (`+containerColumns+`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`, c.ID, c.WorkloadID, c.WorkloadKind, c.Role, c.ContainerID, c.ImageRef, c.ImageTag, c.Host, c.State, c.Port, c.Subdomain, c.ProxyRouteID, c.NpmProxyID, c.LastSeenAt, c.ExtraJSON, c.CreatedAt, c.UpdatedAt, ) if err != nil { return Container{}, fmt.Errorf("insert container: %w", err) } return c, nil } // UpsertContainer is the reconciler's primary write path. It updates an // existing row (matched by ID) or inserts a new one. Caller is responsible // for setting ID — use container_id-based lookup before calling this. func (s *Store) UpsertContainer(c Container) error { if c.ID == "" { return fmt.Errorf("UpsertContainer: ID is required") } if c.Host == "" { c.Host = "local" } if c.ExtraJSON == "" { c.ExtraJSON = "{}" } c.UpdatedAt = Now() if c.CreatedAt == "" { c.CreatedAt = c.UpdatedAt } // SQLite UPSERT — INSERT...ON CONFLICT(id) DO UPDATE. _, err := s.db.Exec( `INSERT INTO containers (`+containerColumns+`) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(id) DO UPDATE SET workload_id=excluded.workload_id, workload_kind=excluded.workload_kind, role=excluded.role, container_id=excluded.container_id, image_ref=excluded.image_ref, image_tag=excluded.image_tag, host=excluded.host, state=excluded.state, port=excluded.port, subdomain=excluded.subdomain, proxy_route_id=excluded.proxy_route_id, npm_proxy_id=excluded.npm_proxy_id, last_seen_at=excluded.last_seen_at, extra_json=excluded.extra_json, updated_at=excluded.updated_at`, c.ID, c.WorkloadID, c.WorkloadKind, c.Role, c.ContainerID, c.ImageRef, c.ImageTag, c.Host, c.State, c.Port, c.Subdomain, c.ProxyRouteID, c.NpmProxyID, c.LastSeenAt, c.ExtraJSON, c.CreatedAt, c.UpdatedAt, ) if err != nil { return fmt.Errorf("upsert container: %w", err) } return nil } // GetContainerByID returns a single container row. func (s *Store) GetContainerByID(id string) (Container, error) { c, err := scanContainer(s.db.QueryRow( `SELECT `+containerColumns+` FROM containers WHERE id = ?`, id, )) if errors.Is(err, sql.ErrNoRows) { return Container{}, fmt.Errorf("container %s: %w", id, ErrNotFound) } if err != nil { return Container{}, fmt.Errorf("query container: %w", err) } return c, nil } // GetContainerByDockerID looks up a container row by its Docker container ID. // The reconciler uses this to decide between insert (new container) and update. func (s *Store) GetContainerByDockerID(dockerID string) (Container, error) { if dockerID == "" { return Container{}, fmt.Errorf("empty container_id: %w", ErrNotFound) } c, err := scanContainer(s.db.QueryRow( `SELECT `+containerColumns+` FROM containers WHERE container_id = ?`, dockerID, )) if errors.Is(err, sql.ErrNoRows) { return Container{}, ErrNotFound } if err != nil { return Container{}, fmt.Errorf("query container by docker id: %w", err) } return c, nil } // ListProxyRoutes returns proxy-enabled project containers joined with // project + stage names. Reads from the normalized containers index. Stage // ID is resolved through a (project_id, role=stage_name) join, which is // uniquely indexed via UNIQUE(project_id, name) on stages. // // Source is reported as "instance" for back-compat with the Proxies page // filter (the frontend keys off the literal string). func (s *Store) ListProxyRoutes(domain string) ([]ProxyRoute, error) { rows, err := s.db.Query(` SELECT c.id, p.id, p.name, s.id, s.name, c.image_tag, c.subdomain, c.container_id, c.port, c.proxy_route_id, c.npm_proxy_id, c.state, c.created_at FROM containers c JOIN workloads w ON w.id = c.workload_id AND w.kind = 'project' JOIN projects p ON p.id = w.ref_id JOIN stages s ON s.project_id = p.id AND s.name = c.role WHERE c.subdomain != '' AND (c.proxy_route_id != '' OR c.npm_proxy_id > 0) ORDER BY p.name, s.name, c.created_at DESC`, ) if err != nil { return nil, fmt.Errorf("query proxy routes: %w", err) } defer rows.Close() routes := []ProxyRoute{} for rows.Next() { var r ProxyRoute if err := rows.Scan( &r.InstanceID, &r.ProjectID, &r.ProjectName, &r.StageID, &r.StageName, &r.ImageTag, &r.Subdomain, &r.ContainerID, &r.Port, &r.ProxyRouteID, &r.NpmProxyID, &r.Status, &r.CreatedAt, ); err != nil { return nil, fmt.Errorf("scan proxy route: %w", err) } r.Source = "instance" if domain != "" && r.Subdomain != "" { r.Domain = r.Subdomain + "." + domain } routes = append(routes, r) } return routes, rows.Err() } // ListContainersByStageID returns project containers for the given stage, // newest first. Resolves stage → project_id → workload(kind=project) → // containers with role = stage.name. Replaces GetInstancesByStageID for // callers in the deployer / API layer. func (s *Store) ListContainersByStageID(stageID string) ([]Container, error) { rows, err := s.db.Query(` SELECT `+prefixCols(containerColumns, "c.")+` FROM containers c JOIN workloads w ON w.id = c.workload_id AND w.kind = 'project' JOIN stages s ON s.project_id = w.ref_id AND s.name = c.role WHERE s.id = ? ORDER BY c.created_at DESC`, stageID) if err != nil { return nil, fmt.Errorf("query containers by stage: %w", err) } defer rows.Close() out := []Container{} for rows.Next() { c, err := scanContainer(rows) if err != nil { return nil, fmt.Errorf("scan container: %w", err) } out = append(out, c) } return out, rows.Err() } // ListContainersByWorkload returns all containers for a given workload, newest first. func (s *Store) ListContainersByWorkload(workloadID string) ([]Container, error) { rows, err := s.db.Query( `SELECT `+containerColumns+` FROM containers WHERE workload_id = ? ORDER BY created_at DESC`, workloadID, ) if err != nil { return nil, fmt.Errorf("query containers by workload: %w", err) } defer rows.Close() out := []Container{} for rows.Next() { c, err := scanContainer(rows) if err != nil { return nil, fmt.Errorf("scan container: %w", err) } out = append(out, c) } return out, rows.Err() } // ContainerFilter narrows ListContainers. Empty fields mean "no filter". type ContainerFilter struct { WorkloadID string WorkloadKind string State string AppID string } // ListContainers returns all containers matching the filter, newest first. // AppID joins through workloads. func (s *Store) ListContainers(f ContainerFilter) ([]Container, error) { var ( where []string args []any ) if f.WorkloadID != "" { where = append(where, "c.workload_id = ?") args = append(args, f.WorkloadID) } if f.WorkloadKind != "" { where = append(where, "c.workload_kind = ?") args = append(args, f.WorkloadKind) } if f.State != "" { where = append(where, "c.state = ?") args = append(args, f.State) } var query string if f.AppID != "" { query = `SELECT ` + prefixCols(containerColumns, "c.") + ` FROM containers c JOIN workloads w ON w.id = c.workload_id WHERE w.app_id = ?` args = append([]any{f.AppID}, args...) if len(where) > 0 { query += " AND " + strings.Join(where, " AND ") } query += " ORDER BY c.created_at DESC" } else { query = `SELECT ` + prefixCols(containerColumns, "c.") + ` FROM containers c` if len(where) > 0 { query += " WHERE " + strings.Join(where, " AND ") } query += " ORDER BY c.created_at DESC" } rows, err := s.db.Query(query, args...) if err != nil { return nil, fmt.Errorf("query containers: %w", err) } defer rows.Close() out := []Container{} for rows.Next() { c, err := scanContainer(rows) if err != nil { return nil, fmt.Errorf("scan container: %w", err) } out = append(out, c) } return out, rows.Err() } // UpdateContainer replaces all mutable fields on an existing container row. // Use this from the deployer when proxy / subdomain assignments change. func (s *Store) UpdateContainer(c Container) error { c.UpdatedAt = Now() if c.ExtraJSON == "" { c.ExtraJSON = "{}" } result, err := s.db.Exec( `UPDATE containers SET workload_id=?, workload_kind=?, role=?, container_id=?, image_ref=?, image_tag=?, host=?, state=?, port=?, subdomain=?, proxy_route_id=?, npm_proxy_id=?, last_seen_at=?, extra_json=?, updated_at=? WHERE id=?`, c.WorkloadID, c.WorkloadKind, c.Role, c.ContainerID, c.ImageRef, c.ImageTag, c.Host, c.State, c.Port, c.Subdomain, c.ProxyRouteID, c.NpmProxyID, c.LastSeenAt, c.ExtraJSON, c.UpdatedAt, c.ID, ) if err != nil { return fmt.Errorf("update container: %w", err) } n, _ := result.RowsAffected() if n == 0 { return fmt.Errorf("container %s: %w", c.ID, ErrNotFound) } return nil } // UpdateContainerState sets only the state and last_seen_at fields. // Used by the reconciler for rapid status flips without a full row read. func (s *Store) UpdateContainerState(id, state string) error { ts := Now() result, err := s.db.Exec( `UPDATE containers SET state=?, last_seen_at=?, updated_at=? WHERE id=?`, state, ts, ts, id, ) if err != nil { return fmt.Errorf("update container state: %w", err) } n, _ := result.RowsAffected() if n == 0 { return fmt.Errorf("container %s: %w", id, ErrNotFound) } return nil } // MarkContainerMissing flips state to 'missing' for a container that the // reconciler can no longer find in `docker ps`. Idempotent. func (s *Store) MarkContainerMissing(id string) error { return s.UpdateContainerState(id, "missing") } // DeleteContainer removes a container row. Use when the underlying Docker // container has been deleted and we want to forget it entirely (vs. marking missing). func (s *Store) DeleteContainer(id string) error { result, err := s.db.Exec(`DELETE FROM containers WHERE id = ?`, id) if err != nil { return fmt.Errorf("delete container: %w", err) } n, _ := result.RowsAffected() if n == 0 { return fmt.Errorf("container %s: %w", id, ErrNotFound) } return nil } // DeleteContainersByWorkload removes every container row for a workload. // Used when a workload is deleted and we want to drop its container index entries. func (s *Store) DeleteContainersByWorkload(workloadID string) error { _, err := s.db.Exec(`DELETE FROM containers WHERE workload_id = ?`, workloadID) if err != nil { return fmt.Errorf("delete containers by workload: %w", err) } return nil } // prefixCols rewrites a comma-separated column list to use a table alias prefix. // Used by ListContainers when joining containers (alias `c`) to workloads. func prefixCols(cols, prefix string) string { parts := strings.Split(cols, ",") for i, p := range parts { parts[i] = prefix + strings.TrimSpace(p) } return strings.Join(parts, ", ") }