91b49cb5ed
- Expand health endpoint to check DB, Docker, and NPM connectivity (FUNC-M4) - Add project_id, stage_id, offset query params to deploys endpoint (FUNC-M5, FUNC-M6) - Add notification_url field to Stage model for per-project overrides (FUNC-M2) - Add NPM Ping method for health checking - Sanitize all internal error messages in API handlers (SEC-M4) - Add audit trail events for admin actions (FUNC-M3) - Add EventLog event type to event bus
213 lines
6.2 KiB
Go
213 lines
6.2 KiB
Go
package store
|
|
|
|
import (
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"strings"
|
|
|
|
"github.com/google/uuid"
|
|
)
|
|
|
|
// CreateDeploy inserts a new deploy record.
|
|
func (s *Store) CreateDeploy(d Deploy) (Deploy, error) {
|
|
d.ID = uuid.New().String()
|
|
d.StartedAt = Now()
|
|
if d.Status == "" {
|
|
d.Status = "pending"
|
|
}
|
|
|
|
_, err := s.db.Exec(
|
|
`INSERT INTO deploys (id, project_id, stage_id, instance_id, image_tag, status, started_at, finished_at, error)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)`,
|
|
d.ID, d.ProjectID, d.StageID, d.InstanceID, d.ImageTag, d.Status, d.StartedAt, d.FinishedAt, d.Error,
|
|
)
|
|
if err != nil {
|
|
return Deploy{}, fmt.Errorf("insert deploy: %w", err)
|
|
}
|
|
return d, nil
|
|
}
|
|
|
|
// GetDeployByID returns a single deploy by its ID.
|
|
func (s *Store) GetDeployByID(id string) (Deploy, error) {
|
|
var d Deploy
|
|
err := s.db.QueryRow(
|
|
`SELECT id, project_id, stage_id, instance_id, image_tag, status, started_at, finished_at, error
|
|
FROM deploys WHERE id = ?`, id,
|
|
).Scan(&d.ID, &d.ProjectID, &d.StageID, &d.InstanceID, &d.ImageTag, &d.Status, &d.StartedAt, &d.FinishedAt, &d.Error)
|
|
if errors.Is(err, sql.ErrNoRows) {
|
|
return Deploy{}, fmt.Errorf("deploy %s: %w", id, ErrNotFound)
|
|
}
|
|
if err != nil {
|
|
return Deploy{}, fmt.Errorf("query deploy: %w", err)
|
|
}
|
|
return d, nil
|
|
}
|
|
|
|
// GetDeploysByProjectID returns all deploys for a project, newest first.
|
|
func (s *Store) GetDeploysByProjectID(projectID string) ([]Deploy, error) {
|
|
rows, err := s.db.Query(
|
|
`SELECT id, project_id, stage_id, instance_id, image_tag, status, started_at, finished_at, error
|
|
FROM deploys WHERE project_id = ? ORDER BY started_at DESC`, projectID,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query deploys: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
return scanDeploys(rows)
|
|
}
|
|
|
|
// GetRecentDeploys returns the most recent deploys across all projects.
|
|
func (s *Store) GetRecentDeploys(limit int) ([]Deploy, error) {
|
|
rows, err := s.db.Query(
|
|
`SELECT id, project_id, stage_id, instance_id, image_tag, status, started_at, finished_at, error
|
|
FROM deploys ORDER BY started_at DESC LIMIT ?`, limit,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query recent deploys: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
return scanDeploys(rows)
|
|
}
|
|
|
|
// UpdateDeployStatus sets the status (and optionally error and finished_at) on a deploy.
|
|
func (s *Store) UpdateDeployStatus(id string, status string, deployErr string) error {
|
|
ts := Now()
|
|
var finishedAt string
|
|
if IsTerminalDeployStatus(status) {
|
|
finishedAt = ts
|
|
}
|
|
|
|
result, err := s.db.Exec(
|
|
`UPDATE deploys SET status=?, error=?, finished_at=? WHERE id=?`,
|
|
status, deployErr, finishedAt, id,
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("update deploy status: %w", err)
|
|
}
|
|
n, _ := result.RowsAffected()
|
|
if n == 0 {
|
|
return fmt.Errorf("deploy %s: %w", id, ErrNotFound)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// SetDeployInstanceID links a deploy to the instance it created.
|
|
func (s *Store) SetDeployInstanceID(deployID string, instanceID string) error {
|
|
result, err := s.db.Exec(`UPDATE deploys SET instance_id=? WHERE id=?`, instanceID, deployID)
|
|
if err != nil {
|
|
return fmt.Errorf("set deploy instance: %w", err)
|
|
}
|
|
n, _ := result.RowsAffected()
|
|
if n == 0 {
|
|
return fmt.Errorf("deploy %s: %w", deployID, ErrNotFound)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// AppendDeployLog adds a log entry for a deploy.
|
|
func (s *Store) AppendDeployLog(deployID string, message string, level string) error {
|
|
if level == "" {
|
|
level = "info"
|
|
}
|
|
_, err := s.db.Exec(
|
|
`INSERT INTO deploy_logs (deploy_id, message, level, created_at) VALUES (?, ?, ?, ?)`,
|
|
deployID, message, level, Now(),
|
|
)
|
|
if err != nil {
|
|
return fmt.Errorf("append deploy log: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// GetDeployLogs returns all log entries for a deploy, ordered chronologically.
|
|
func (s *Store) GetDeployLogs(deployID string) ([]DeployLog, error) {
|
|
rows, err := s.db.Query(
|
|
`SELECT id, deploy_id, message, level, created_at
|
|
FROM deploy_logs WHERE deploy_id = ? ORDER BY id`, deployID,
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query deploy logs: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
logs := []DeployLog{}
|
|
for rows.Next() {
|
|
var l DeployLog
|
|
if err := rows.Scan(&l.ID, &l.DeployID, &l.Message, &l.Level, &l.CreatedAt); err != nil {
|
|
return nil, fmt.Errorf("scan deploy log: %w", err)
|
|
}
|
|
logs = append(logs, l)
|
|
}
|
|
return logs, rows.Err()
|
|
}
|
|
|
|
// scanDeploys is a helper that scans deploy rows from a cursor.
|
|
func scanDeploys(rows *sql.Rows) ([]Deploy, error) {
|
|
deploys := []Deploy{}
|
|
for rows.Next() {
|
|
var d Deploy
|
|
if err := rows.Scan(&d.ID, &d.ProjectID, &d.StageID, &d.InstanceID, &d.ImageTag, &d.Status, &d.StartedAt, &d.FinishedAt, &d.Error); err != nil {
|
|
return nil, fmt.Errorf("scan deploy: %w", err)
|
|
}
|
|
deploys = append(deploys, d)
|
|
}
|
|
return deploys, rows.Err()
|
|
}
|
|
|
|
// IsTerminalDeployStatus returns true if the status indicates the deploy is finished.
|
|
func IsTerminalDeployStatus(status string) bool {
|
|
switch status {
|
|
case "success", "failed", "rolled_back":
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// GetDeploys returns deploys with optional filtering by project and stage, with pagination.
|
|
func (s *Store) GetDeploys(projectID, stageID string, limit, offset int) ([]Deploy, error) {
|
|
query := `SELECT id, project_id, stage_id, instance_id, image_tag, status, started_at, finished_at, error FROM deploys`
|
|
var args []any
|
|
var conditions []string
|
|
|
|
if projectID != "" {
|
|
conditions = append(conditions, "project_id = ?")
|
|
args = append(args, projectID)
|
|
}
|
|
if stageID != "" {
|
|
conditions = append(conditions, "stage_id = ?")
|
|
args = append(args, stageID)
|
|
}
|
|
|
|
if len(conditions) > 0 {
|
|
query += " WHERE " + strings.Join(conditions, " AND ")
|
|
}
|
|
query += " ORDER BY started_at DESC LIMIT ? OFFSET ?"
|
|
args = append(args, limit, offset)
|
|
|
|
rows, err := s.db.Query(query, args...)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query deploys: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
return scanDeploys(rows)
|
|
}
|
|
|
|
// CleanupOldDeploys removes deploy records and their logs older than the given
|
|
// number of days. Returns the number of deploys removed.
|
|
func (s *Store) CleanupOldDeploys(retentionDays int) (int64, error) {
|
|
cutoff := fmt.Sprintf("-%d days", retentionDays)
|
|
result, err := s.db.Exec(
|
|
`DELETE FROM deploys WHERE started_at < datetime('now', ?)`, cutoff,
|
|
)
|
|
if err != nil {
|
|
return 0, fmt.Errorf("cleanup old deploys: %w", err)
|
|
}
|
|
n, _ := result.RowsAffected()
|
|
return n, nil
|
|
}
|