feat(observability): phase 1 - schema, models & event log backend

Add database foundation for observability features:
- event_log table with severity/source filtering and pagination
- standalone_proxies table for user-created reverse proxies
- stale_threshold_days setting (default 7 days)
- Auto-persist warn/error events from event bus to database
- SSE broadcast of persistent events for real-time UI updates
- Frontend types and API functions for downstream UI phases
This commit is contained in:
2026-03-30 10:59:13 +03:00
parent f71c314262
commit c38b7d4c78
23 changed files with 1149 additions and 20 deletions
+148
View File
@@ -0,0 +1,148 @@
package store
import (
"fmt"
"strings"
)
// EventLogFilter holds optional filters for listing event log entries.
type EventLogFilter struct {
Severity string // Filter by severity (info, warn, error).
Source string // Filter by source.
Since string // Only events created at or after this timestamp.
Until string // Only events created at or before this timestamp.
Limit int // Maximum number of results (default 50).
Offset int // Offset for pagination.
}
// EventLogStats holds counts of event log entries by severity.
type EventLogStats struct {
Info int `json:"info"`
Warn int `json:"warn"`
Error int `json:"error"`
Total int `json:"total"`
}
// InsertEvent inserts a new event log entry.
func (s *Store) InsertEvent(evt EventLog) (EventLog, error) {
evt.CreatedAt = Now()
if evt.Metadata == "" {
evt.Metadata = "{}"
}
result, err := s.db.Exec(
`INSERT INTO event_log (source, severity, message, metadata, created_at)
VALUES (?, ?, ?, ?, ?)`,
evt.Source, evt.Severity, evt.Message, evt.Metadata, evt.CreatedAt,
)
if err != nil {
return EventLog{}, fmt.Errorf("insert event: %w", err)
}
id, err := result.LastInsertId()
if err != nil {
return EventLog{}, fmt.Errorf("get event id: %w", err)
}
evt.ID = id
return evt, nil
}
// ListEvents returns event log entries matching the given filter.
func (s *Store) ListEvents(filter EventLogFilter) ([]EventLog, error) {
var conditions []string
var args []any
if filter.Severity != "" {
conditions = append(conditions, "severity = ?")
args = append(args, filter.Severity)
}
if filter.Source != "" {
conditions = append(conditions, "source = ?")
args = append(args, filter.Source)
}
if filter.Since != "" {
conditions = append(conditions, "created_at >= ?")
args = append(args, filter.Since)
}
if filter.Until != "" {
conditions = append(conditions, "created_at <= ?")
args = append(args, filter.Until)
}
query := "SELECT id, source, severity, message, metadata, created_at FROM event_log"
if len(conditions) > 0 {
query += " WHERE " + strings.Join(conditions, " AND ")
}
query += " ORDER BY created_at DESC"
limit := filter.Limit
if limit <= 0 {
limit = 50
}
if limit > 500 {
limit = 500
}
query += fmt.Sprintf(" LIMIT %d OFFSET %d", limit, filter.Offset)
rows, err := s.db.Query(query, args...)
if err != nil {
return nil, fmt.Errorf("query events: %w", err)
}
defer rows.Close()
events := []EventLog{}
for rows.Next() {
var evt EventLog
if err := rows.Scan(&evt.ID, &evt.Source, &evt.Severity, &evt.Message, &evt.Metadata, &evt.CreatedAt); err != nil {
return nil, fmt.Errorf("scan event: %w", err)
}
events = append(events, evt)
}
return events, rows.Err()
}
// GetEventStats returns counts of event log entries grouped by severity.
func (s *Store) GetEventStats() (EventLogStats, error) {
rows, err := s.db.Query(
`SELECT severity, COUNT(*) FROM event_log GROUP BY severity`,
)
if err != nil {
return EventLogStats{}, fmt.Errorf("query event stats: %w", err)
}
defer rows.Close()
var stats EventLogStats
for rows.Next() {
var severity string
var count int
if err := rows.Scan(&severity, &count); err != nil {
return EventLogStats{}, fmt.Errorf("scan event stats: %w", err)
}
switch severity {
case "info":
stats.Info = count
case "warn":
stats.Warn = count
case "error":
stats.Error = count
}
stats.Total += count
}
return stats, rows.Err()
}
// PruneEvents deletes event log entries older than the given number of days.
func (s *Store) PruneEvents(olderThanDays int) (int64, error) {
if olderThanDays < 1 {
return 0, fmt.Errorf("prune events: olderThanDays must be >= 1, got %d", olderThanDays)
}
result, err := s.db.Exec(
`DELETE FROM event_log WHERE created_at < datetime('now', ?)`,
fmt.Sprintf("-%d days", olderThanDays),
)
if err != nil {
return 0, fmt.Errorf("prune events: %w", err)
}
return result.RowsAffected()
}
+27 -2
View File
@@ -55,8 +55,9 @@ type Settings struct {
WebhookSecret string `json:"webhook_secret"`
PollingInterval string `json:"polling_interval"`
BaseVolumePath string `json:"base_volume_path"`
SSLCertificateID int `json:"ssl_certificate_id"`
UpdatedAt string `json:"updated_at"`
SSLCertificateID int `json:"ssl_certificate_id"`
StaleThresholdDays int `json:"stale_threshold_days"`
UpdatedAt string `json:"updated_at"`
}
// Instance represents a running (or stopped) container for a project stage.
@@ -117,3 +118,27 @@ type Volume struct {
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
// EventLog represents a persistent event log entry.
type EventLog struct {
ID int64 `json:"id"`
Source string `json:"source"`
Severity string `json:"severity"` // info, warn, error
Message string `json:"message"`
Metadata string `json:"metadata"` // JSON-encoded structured data
CreatedAt string `json:"created_at"`
}
// StandaloneProxy represents a standalone reverse proxy not tied to a project.
type StandaloneProxy struct {
ID string `json:"id"`
Domain string `json:"domain"`
DestinationURL string `json:"destination_url"`
DestinationPort int `json:"destination_port"`
SSLCertificateID int `json:"ssl_certificate_id"`
NpmProxyID int `json:"npm_proxy_id"`
HealthStatus string `json:"health_status"` // unknown, healthy, unhealthy
HealthCheckedAt string `json:"health_checked_at"`
CreatedAt string `json:"created_at"`
UpdatedAt string `json:"updated_at"`
}
+4 -4
View File
@@ -9,10 +9,10 @@ func (s *Store) GetSettings() (Settings, error) {
var st Settings
err := s.db.QueryRow(
`SELECT domain, server_ip, network, subdomain_pattern, notification_url,
npm_url, npm_email, npm_password, webhook_secret, polling_interval, base_volume_path, ssl_certificate_id, updated_at
npm_url, npm_email, npm_password, webhook_secret, polling_interval, base_volume_path, ssl_certificate_id, stale_threshold_days, updated_at
FROM settings WHERE id = 1`,
).Scan(&st.Domain, &st.ServerIP, &st.Network, &st.SubdomainPattern, &st.NotificationURL,
&st.NpmURL, &st.NpmEmail, &st.NpmPassword, &st.WebhookSecret, &st.PollingInterval, &st.BaseVolumePath, &st.SSLCertificateID, &st.UpdatedAt)
&st.NpmURL, &st.NpmEmail, &st.NpmPassword, &st.WebhookSecret, &st.PollingInterval, &st.BaseVolumePath, &st.SSLCertificateID, &st.StaleThresholdDays, &st.UpdatedAt)
if err != nil {
return Settings{}, fmt.Errorf("query settings: %w", err)
}
@@ -25,10 +25,10 @@ func (s *Store) UpdateSettings(st Settings) error {
_, err := s.db.Exec(
`UPDATE settings SET
domain=?, server_ip=?, network=?, subdomain_pattern=?, notification_url=?,
npm_url=?, npm_email=?, npm_password=?, webhook_secret=?, polling_interval=?, base_volume_path=?, ssl_certificate_id=?, updated_at=?
npm_url=?, npm_email=?, npm_password=?, webhook_secret=?, polling_interval=?, base_volume_path=?, ssl_certificate_id=?, stale_threshold_days=?, updated_at=?
WHERE id = 1`,
st.Domain, st.ServerIP, st.Network, st.SubdomainPattern, st.NotificationURL,
st.NpmURL, st.NpmEmail, st.NpmPassword, st.WebhookSecret, st.PollingInterval, st.BaseVolumePath, st.SSLCertificateID, st.UpdatedAt,
st.NpmURL, st.NpmEmail, st.NpmPassword, st.WebhookSecret, st.PollingInterval, st.BaseVolumePath, st.SSLCertificateID, st.StaleThresholdDays, st.UpdatedAt,
)
if err != nil {
return fmt.Errorf("update settings: %w", err)
+120
View File
@@ -0,0 +1,120 @@
package store
import (
"database/sql"
"errors"
"fmt"
"github.com/google/uuid"
)
// CreateStandaloneProxy inserts a new standalone proxy record.
func (s *Store) CreateStandaloneProxy(p StandaloneProxy) (StandaloneProxy, error) {
p.ID = uuid.New().String()
p.CreatedAt = Now()
p.UpdatedAt = p.CreatedAt
if p.HealthStatus == "" {
p.HealthStatus = "unknown"
}
_, err := s.db.Exec(
`INSERT INTO standalone_proxies (id, domain, destination_url, destination_port, ssl_certificate_id, npm_proxy_id, health_status, health_checked_at, created_at, updated_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`,
p.ID, p.Domain, p.DestinationURL, p.DestinationPort, p.SSLCertificateID,
p.NpmProxyID, p.HealthStatus, p.HealthCheckedAt, p.CreatedAt, p.UpdatedAt,
)
if err != nil {
return StandaloneProxy{}, fmt.Errorf("insert standalone proxy: %w", err)
}
return p, nil
}
// GetStandaloneProxy returns a standalone proxy by ID.
func (s *Store) GetStandaloneProxy(id string) (StandaloneProxy, error) {
var p StandaloneProxy
err := s.db.QueryRow(
`SELECT id, domain, destination_url, destination_port, ssl_certificate_id, npm_proxy_id, health_status, health_checked_at, created_at, updated_at
FROM standalone_proxies WHERE id = ?`, id,
).Scan(&p.ID, &p.Domain, &p.DestinationURL, &p.DestinationPort, &p.SSLCertificateID,
&p.NpmProxyID, &p.HealthStatus, &p.HealthCheckedAt, &p.CreatedAt, &p.UpdatedAt)
if errors.Is(err, sql.ErrNoRows) {
return StandaloneProxy{}, fmt.Errorf("standalone proxy %s: %w", id, ErrNotFound)
}
if err != nil {
return StandaloneProxy{}, fmt.Errorf("query standalone proxy: %w", err)
}
return p, nil
}
// ListStandaloneProxies returns all standalone proxy records ordered by creation time.
func (s *Store) ListStandaloneProxies() ([]StandaloneProxy, error) {
rows, err := s.db.Query(
`SELECT id, domain, destination_url, destination_port, ssl_certificate_id, npm_proxy_id, health_status, health_checked_at, created_at, updated_at
FROM standalone_proxies ORDER BY created_at DESC`,
)
if err != nil {
return nil, fmt.Errorf("query standalone proxies: %w", err)
}
defer rows.Close()
proxies := []StandaloneProxy{}
for rows.Next() {
var p StandaloneProxy
if err := rows.Scan(&p.ID, &p.Domain, &p.DestinationURL, &p.DestinationPort, &p.SSLCertificateID,
&p.NpmProxyID, &p.HealthStatus, &p.HealthCheckedAt, &p.CreatedAt, &p.UpdatedAt); err != nil {
return nil, fmt.Errorf("scan standalone proxy: %w", err)
}
proxies = append(proxies, p)
}
return proxies, rows.Err()
}
// UpdateStandaloneProxy updates an existing standalone proxy's mutable fields.
func (s *Store) UpdateStandaloneProxy(p StandaloneProxy) error {
p.UpdatedAt = Now()
result, err := s.db.Exec(
`UPDATE standalone_proxies SET domain=?, destination_url=?, destination_port=?, ssl_certificate_id=?, npm_proxy_id=?, health_status=?, health_checked_at=?, updated_at=?
WHERE id=?`,
p.Domain, p.DestinationURL, p.DestinationPort, p.SSLCertificateID,
p.NpmProxyID, p.HealthStatus, p.HealthCheckedAt, p.UpdatedAt, p.ID,
)
if err != nil {
return fmt.Errorf("update standalone proxy: %w", err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("standalone proxy %s: %w", p.ID, ErrNotFound)
}
return nil
}
// DeleteStandaloneProxy removes a standalone proxy by ID.
func (s *Store) DeleteStandaloneProxy(id string) error {
result, err := s.db.Exec(`DELETE FROM standalone_proxies WHERE id = ?`, id)
if err != nil {
return fmt.Errorf("delete standalone proxy: %w", err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("standalone proxy %s: %w", id, ErrNotFound)
}
return nil
}
// UpdateProxyHealth updates the health status and check timestamp for a standalone proxy.
func (s *Store) UpdateProxyHealth(id string, status string) error {
ts := Now()
result, err := s.db.Exec(
`UPDATE standalone_proxies SET health_status=?, health_checked_at=?, updated_at=? WHERE id=?`,
status, ts, ts, id,
)
if err != nil {
return fmt.Errorf("update proxy health: %w", err)
}
n, _ := result.RowsAffected()
if n == 0 {
return fmt.Errorf("standalone proxy %s: %w", id, ErrNotFound)
}
return nil
}
+27
View File
@@ -81,6 +81,8 @@ func (s *Store) runMigrations() error {
`ALTER TABLE stages ADD COLUMN enable_proxy INTEGER NOT NULL DEFAULT 1`,
// Add ssl_certificate_id to settings (2026-03-29).
`ALTER TABLE settings ADD COLUMN ssl_certificate_id INTEGER NOT NULL DEFAULT 0`,
// Add stale_threshold_days to settings (2026-03-30).
`ALTER TABLE settings ADD COLUMN stale_threshold_days INTEGER NOT NULL DEFAULT 7`,
}
for _, m := range migrations {
@@ -98,6 +100,9 @@ func (s *Store) runMigrations() error {
`CREATE INDEX IF NOT EXISTS idx_stages_project_id ON stages(project_id)`,
`CREATE INDEX IF NOT EXISTS idx_stage_env_stage_id ON stage_env(stage_id)`,
`CREATE INDEX IF NOT EXISTS idx_volumes_project_id ON volumes(project_id)`,
`CREATE INDEX IF NOT EXISTS idx_event_log_severity ON event_log(severity)`,
`CREATE INDEX IF NOT EXISTS idx_event_log_source ON event_log(source)`,
`CREATE INDEX IF NOT EXISTS idx_event_log_created_at ON event_log(created_at)`,
}
for _, idx := range indexes {
if _, err := s.db.Exec(idx); err != nil {
@@ -250,6 +255,28 @@ CREATE TABLE IF NOT EXISTS volumes (
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS event_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source TEXT NOT NULL DEFAULT '',
severity TEXT NOT NULL DEFAULT 'info',
message TEXT NOT NULL DEFAULT '',
metadata TEXT NOT NULL DEFAULT '{}',
created_at TEXT NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS standalone_proxies (
id TEXT PRIMARY KEY,
domain TEXT NOT NULL UNIQUE,
destination_url TEXT NOT NULL DEFAULT '',
destination_port INTEGER NOT NULL DEFAULT 0,
ssl_certificate_id INTEGER NOT NULL DEFAULT 0,
npm_proxy_id INTEGER NOT NULL DEFAULT 0,
health_status TEXT NOT NULL DEFAULT 'unknown',
health_checked_at TEXT NOT NULL DEFAULT '',
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now'))
);
`
// Now returns the current time formatted for SQLite storage.