feat(observability): phase 3 - direct proxy creation with validation
Add standalone proxy management: - Multi-step validation pipeline (DNS, TCP, HTTP) with diagnostic hints - Proxy lifecycle: create/update/delete via NPM API with SSL auto-assign - Periodic health monitoring (5min) with event log on status transitions - Unified /api/proxies/all endpoint merging standalone + managed proxies - Frontend types and API functions for downstream UI phases
This commit is contained in:
@@ -0,0 +1,184 @@
|
||||
package proxy
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"log/slog"
|
||||
"net/http"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/alexei/docker-watcher/internal/events"
|
||||
"github.com/alexei/docker-watcher/internal/store"
|
||||
"github.com/robfig/cron/v3"
|
||||
)
|
||||
|
||||
// HealthMonitor periodically checks the health of all standalone proxies.
|
||||
type HealthMonitor struct {
|
||||
store *store.Store
|
||||
eventBus *events.Bus
|
||||
|
||||
cron *cron.Cron
|
||||
mu sync.Mutex
|
||||
entryID cron.EntryID
|
||||
running bool
|
||||
}
|
||||
|
||||
// NewHealthMonitor creates a new proxy health monitor.
|
||||
func NewHealthMonitor(st *store.Store, eventBus *events.Bus) *HealthMonitor {
|
||||
return &HealthMonitor{
|
||||
store: st,
|
||||
eventBus: eventBus,
|
||||
cron: cron.New(),
|
||||
}
|
||||
}
|
||||
|
||||
// Start begins periodic health checks with the given interval (e.g., "5m", "1m").
|
||||
// If already running, it stops and restarts with the new interval.
|
||||
func (h *HealthMonitor) Start(interval string) error {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
duration, err := time.ParseDuration(interval)
|
||||
if err != nil {
|
||||
return fmt.Errorf("parse health check interval %q: %w", interval, err)
|
||||
}
|
||||
|
||||
if h.running {
|
||||
h.cron.Remove(h.entryID)
|
||||
}
|
||||
|
||||
spec := fmt.Sprintf("@every %s", duration.String())
|
||||
entryID, err := h.cron.AddFunc(spec, func() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute)
|
||||
defer cancel()
|
||||
if checkErr := h.CheckAll(ctx); checkErr != nil {
|
||||
slog.Warn("proxy health monitor: check error", "error", checkErr)
|
||||
}
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("schedule proxy health monitor: %w", err)
|
||||
}
|
||||
|
||||
h.entryID = entryID
|
||||
if !h.running {
|
||||
h.cron.Start()
|
||||
}
|
||||
h.running = true
|
||||
|
||||
slog.Info("proxy health monitor started", "interval", duration.String())
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop gracefully shuts down the health monitor.
|
||||
func (h *HealthMonitor) Stop() {
|
||||
h.mu.Lock()
|
||||
defer h.mu.Unlock()
|
||||
|
||||
if h.running {
|
||||
ctx := h.cron.Stop()
|
||||
<-ctx.Done()
|
||||
h.running = false
|
||||
slog.Info("proxy health monitor stopped")
|
||||
}
|
||||
}
|
||||
|
||||
// CheckAll performs a single health check cycle for all standalone proxies.
|
||||
func (h *HealthMonitor) CheckAll(ctx context.Context) error {
|
||||
proxies, err := h.store.ListStandaloneProxies()
|
||||
if err != nil {
|
||||
return fmt.Errorf("list standalone proxies: %w", err)
|
||||
}
|
||||
|
||||
for _, proxy := range proxies {
|
||||
newStatus := checkProxyHealth(ctx, proxy.DestinationURL, proxy.DestinationPort)
|
||||
oldStatus := proxy.HealthStatus
|
||||
|
||||
if err := h.store.UpdateProxyHealth(proxy.ID, newStatus); err != nil {
|
||||
slog.Warn("proxy health monitor: failed to update health",
|
||||
"proxy_id", proxy.ID, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Emit event on status change.
|
||||
if oldStatus != newStatus && oldStatus != "unknown" {
|
||||
h.emitHealthEvent(proxy, oldStatus, newStatus)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// checkProxyHealth performs an HTTP GET to the destination and returns the health status.
|
||||
func checkProxyHealth(ctx context.Context, host string, port int) string {
|
||||
target := fmt.Sprintf("http://%s:%d/", host, port)
|
||||
|
||||
reqCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
||||
defer cancel()
|
||||
|
||||
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, target, nil)
|
||||
if err != nil {
|
||||
return "unhealthy"
|
||||
}
|
||||
|
||||
client := &http.Client{
|
||||
Timeout: 10 * time.Second,
|
||||
CheckRedirect: func(*http.Request, []*http.Request) error {
|
||||
return http.ErrUseLastResponse
|
||||
},
|
||||
}
|
||||
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return "unhealthy"
|
||||
}
|
||||
resp.Body.Close()
|
||||
|
||||
if resp.StatusCode >= 500 {
|
||||
return "unhealthy"
|
||||
}
|
||||
|
||||
return "healthy"
|
||||
}
|
||||
|
||||
// emitHealthEvent persists and publishes a health status change event.
|
||||
func (h *HealthMonitor) emitHealthEvent(proxy store.StandaloneProxy, oldStatus, newStatus string) {
|
||||
severity := "info"
|
||||
if newStatus == "unhealthy" {
|
||||
severity = "warn"
|
||||
}
|
||||
|
||||
msg := fmt.Sprintf("Proxy %s (%s) health changed: %s -> %s",
|
||||
proxy.Domain, proxy.ID, oldStatus, newStatus)
|
||||
|
||||
metadata, _ := json.Marshal(map[string]any{
|
||||
"proxy_id": proxy.ID,
|
||||
"domain": proxy.Domain,
|
||||
"old_status": oldStatus,
|
||||
"new_status": newStatus,
|
||||
})
|
||||
|
||||
evt, err := h.store.InsertEvent(store.EventLog{
|
||||
Source: "proxy_health",
|
||||
Severity: severity,
|
||||
Message: msg,
|
||||
Metadata: string(metadata),
|
||||
})
|
||||
if err != nil {
|
||||
slog.Error("proxy health monitor: failed to persist event", "error", err)
|
||||
return
|
||||
}
|
||||
|
||||
h.eventBus.Publish(events.Event{
|
||||
Type: events.EventLog,
|
||||
Payload: events.EventLogPayload{
|
||||
ID: evt.ID,
|
||||
Source: "proxy_health",
|
||||
Severity: severity,
|
||||
Message: msg,
|
||||
Metadata: string(metadata),
|
||||
CreatedAt: evt.CreatedAt,
|
||||
},
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user