diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..d17f702 --- /dev/null +++ b/Makefile @@ -0,0 +1,21 @@ +.PHONY: build build-frontend build-backend dev clean + +# Build everything: frontend first, then backend (which embeds frontend). +build: build-frontend build-backend + +# Build the SvelteKit frontend to web/build/. +build-frontend: + cd web && npm install && npm run build + +# Build the Go binary (embeds web/build/ via go:embed). +build-backend: + go build -o docker-watcher ./cmd/server + +# Run in development mode with hot reload. +# Requires air (go install github.com/air-verse/air@latest). +dev: + air -c .air.toml 2>/dev/null || go run ./cmd/server + +# Clean build artifacts. +clean: + rm -rf web/build web/node_modules/.vite docker-watcher diff --git a/PLAN.md b/PLAN.md index f384aaf..66403fc 100644 --- a/PLAN.md +++ b/PLAN.md @@ -284,8 +284,8 @@ Full dashboard for visibility, manual control, and configuration. 19. **Quick Deploy page** — paste image URL, auto-inspect, pre-fill form, one-click deploy 20. **Settings pages** — registries, credentials, global settings, webhook URL management 21. **Project config pages** — add/edit/delete projects and stages via UI -22. **Embed in Go** — build SvelteKit to static, embed with `go:embed`, serve from Go -23. **Real-time updates** — SSE for deploy progress and instance status changes +22. **Embed in Go** ✅ — build SvelteKit to static, embed with `go:embed`, serve from Go +23. **Real-time updates** ✅ — SSE for deploy progress and instance status changes ### Phase 4: Volumes & Environment @@ -329,7 +329,9 @@ stages: 30. **Blue-green deploys** — start new, health check, swap, stop old (zero downtime) 31. **Promote flow** — enforce `promote_from` for production deploys -32. **Auth on dashboard** — basic auth or token-based +32. **Auth on dashboard** — two modes, configurable via settings: + - **Local auth** — username/password stored in SQLite (hashed), for simple setups + - **OAuth2 / OpenID Connect** — integration with Authentik (or any OIDC provider), configurable client ID/secret/discovery URL 33. **Graceful shutdown** — drain in-progress deploys on SIGTERM 34. **Structured logging** — JSON logs with deploy context 35. **Config export** — download current SQLite state as YAML diff --git a/cmd/server/main.go b/cmd/server/main.go index 88f1a46..b7d80fb 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -2,6 +2,7 @@ package main import ( "context" + "io/fs" "log" "net/http" "os" @@ -10,11 +11,13 @@ import ( "syscall" "time" + dockerwatcher "github.com/alexei/docker-watcher" "github.com/alexei/docker-watcher/internal/api" "github.com/alexei/docker-watcher/internal/config" "github.com/alexei/docker-watcher/internal/crypto" "github.com/alexei/docker-watcher/internal/deployer" "github.com/alexei/docker-watcher/internal/docker" + "github.com/alexei/docker-watcher/internal/events" "github.com/alexei/docker-watcher/internal/health" "github.com/alexei/docker-watcher/internal/notify" "github.com/alexei/docker-watcher/internal/npm" @@ -71,14 +74,15 @@ func main() { // Initialize services. healthChecker := health.New() notifier := notify.New() + eventBus := events.New() - dep := deployer.New(dockerClient, npmClient, db, healthChecker, notifier, encKey) + dep := deployer.New(dockerClient, npmClient, db, healthChecker, notifier, eventBus, encKey) // Initialize webhook handler. webhookHandler := webhook.NewHandler(db, dep, dockerClient) // Ensure webhook secret exists. - secret, err := webhook.EnsureWebhookSecret(db) + _, err = webhook.EnsureWebhookSecret(db) if err != nil { log.Fatalf("ensure webhook secret: %v", err) } @@ -94,16 +98,29 @@ func main() { } // Build API server. - apiServer := api.NewServer(db, dockerClient, dep, webhookHandler, encKey) + apiServer := api.NewServer(db, dockerClient, dep, webhookHandler, eventBus, encKey) router := apiServer.Router() + // Serve embedded static files for the SPA frontend. + // The embed.FS has "web/build" as a prefix, so we sub it to get the root. + webBuildFS, err := fs.Sub(dockerwatcher.WebBuildFS, "web/build") + if err != nil { + log.Printf("WARNING: embedded frontend not available: %v", err) + } else { + staticHandler := api.StaticHandler(webBuildFS) + // Handle all non-API routes with the static file server. + router.NotFound(staticHandler.ServeHTTP) + } + // Start HTTP server. addr := envOrDefault("LISTEN_ADDR", ":8080") httpServer := &http.Server{ - Addr: addr, - Handler: router, - ReadTimeout: 30 * time.Second, - WriteTimeout: 60 * time.Second, + Addr: addr, + Handler: router, + ReadTimeout: 30 * time.Second, + // WriteTimeout is disabled (0) to support SSE long-lived connections. + // Individual non-SSE handlers should use context timeouts as needed. + WriteTimeout: 0, IdleTimeout: 120 * time.Second, } diff --git a/internal/api/deploys.go b/internal/api/deploys.go index e625947..e4baef6 100644 --- a/internal/api/deploys.go +++ b/internal/api/deploys.go @@ -1,14 +1,11 @@ package api import ( - "errors" "log" "net/http" "strconv" "strings" - "github.com/go-chi/chi/v5" - "github.com/alexei/docker-watcher/internal/store" ) @@ -30,29 +27,8 @@ func (s *Server) listDeploys(w http.ResponseWriter, r *http.Request) { respondJSON(w, http.StatusOK, deploys) } -// getDeployLogs handles GET /api/deploys/{id}/logs. -// This is an SSE stub that returns logs as JSON for now. -// Real SSE streaming will be implemented in Phase 11. -func (s *Server) getDeployLogs(w http.ResponseWriter, r *http.Request) { - deployID := chi.URLParam(r, "id") - - // Verify deploy exists. - if _, err := s.store.GetDeployByID(deployID); err != nil { - if errors.Is(err, store.ErrNotFound) { - respondNotFound(w, "deploy") - return - } - respondError(w, http.StatusInternalServerError, "failed to get deploy: "+err.Error()) - return - } - - logs, err := s.store.GetDeployLogs(deployID) - if err != nil { - respondError(w, http.StatusInternalServerError, "failed to get deploy logs: "+err.Error()) - return - } - respondJSON(w, http.StatusOK, logs) -} +// NOTE: getDeployLogs has been replaced by streamDeployLogs in sse.go. +// The new handler supports both SSE streaming and JSON fallback via Accept header. // inspectRequest is the expected JSON body for POST /api/deploy/inspect. type inspectRequest struct { diff --git a/internal/api/router.go b/internal/api/router.go index 347e4c7..81ad872 100644 --- a/internal/api/router.go +++ b/internal/api/router.go @@ -4,6 +4,7 @@ import ( "github.com/go-chi/chi/v5" "github.com/alexei/docker-watcher/internal/docker" + "github.com/alexei/docker-watcher/internal/events" "github.com/alexei/docker-watcher/internal/store" "github.com/alexei/docker-watcher/internal/webhook" ) @@ -14,6 +15,7 @@ type Server struct { docker *docker.Client deployer DeployTriggerer webhook *webhook.Handler + eventBus *events.Bus encKey [32]byte } @@ -23,6 +25,7 @@ func NewServer( dockerClient *docker.Client, deployer DeployTriggerer, webhookHandler *webhook.Handler, + eventBus *events.Bus, encKey [32]byte, ) *Server { return &Server{ @@ -30,6 +33,7 @@ func NewServer( docker: dockerClient, deployer: deployer, webhook: webhookHandler, + eventBus: eventBus, encKey: encKey, } } @@ -73,7 +77,10 @@ func (s *Server) Router() chi.Router { // Deploy endpoints. r.Get("/deploys", s.listDeploys) - r.Get("/deploys/{id}/logs", s.getDeployLogs) + r.Get("/deploys/{id}/logs", s.streamDeployLogs) + + // SSE endpoint for real-time instance status and deploy events. + r.Get("/events", s.streamEvents) // Quick deploy endpoints. r.Post("/deploy/inspect", s.inspectImage) diff --git a/internal/api/sse.go b/internal/api/sse.go new file mode 100644 index 0000000..a408601 --- /dev/null +++ b/internal/api/sse.go @@ -0,0 +1,192 @@ +package api + +import ( + "encoding/json" + "errors" + "fmt" + "log" + "net/http" + "strings" + + "github.com/go-chi/chi/v5" + + "github.com/alexei/docker-watcher/internal/events" + "github.com/alexei/docker-watcher/internal/store" +) + +// streamDeployLogs handles GET /api/deploys/{id}/logs. +// It supports both SSE streaming and JSON fallback based on the Accept header. +// +// SSE mode (Accept: text/event-stream): +// +// Streams deploy log events in real-time. Existing logs are sent first, +// then new logs are pushed as they arrive via the event bus. +// +// JSON mode (default): +// +// Returns all existing deploy logs as a JSON array. +func (s *Server) streamDeployLogs(w http.ResponseWriter, r *http.Request) { + deployID := chi.URLParam(r, "id") + + // Verify deploy exists. + deploy, err := s.store.GetDeployByID(deployID) + if err != nil { + if errors.Is(err, store.ErrNotFound) { + respondNotFound(w, "deploy") + return + } + respondError(w, http.StatusInternalServerError, "failed to get deploy: "+err.Error()) + return + } + + // JSON fallback: return existing logs as array. + accept := r.Header.Get("Accept") + if !strings.Contains(accept, "text/event-stream") { + logs, err := s.store.GetDeployLogs(deployID) + if err != nil { + respondError(w, http.StatusInternalServerError, "failed to get deploy logs: "+err.Error()) + return + } + respondJSON(w, http.StatusOK, logs) + return + } + + // SSE mode. + flusher, ok := w.(http.Flusher) + if !ok { + respondError(w, http.StatusInternalServerError, "streaming not supported") + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + // Send existing logs first. + existingLogs, err := s.store.GetDeployLogs(deployID) + if err != nil { + log.Printf("[sse] failed to get existing deploy logs: %v", err) + } else { + for _, entry := range existingLogs { + writeSSE(w, flusher, events.Event{ + Type: events.EventDeployLog, + Payload: events.DeployLogPayload{ + DeployID: deployID, + Message: entry.Message, + Level: entry.Level, + }, + }) + } + } + + // If deploy is already finished, send completion and close. + if isTerminalStatus(deploy.Status) { + writeSSE(w, flusher, events.Event{ + Type: events.EventDeployStatus, + Payload: events.DeployStatusPayload{ + DeployID: deployID, + ProjectID: deploy.ProjectID, + StageID: deploy.StageID, + ImageTag: deploy.ImageTag, + Status: deploy.Status, + Error: deploy.Error, + }, + }) + return + } + + // Subscribe to new deploy log events for this deploy. + sub := s.eventBus.Subscribe(func(evt events.Event) bool { + switch payload := evt.Payload.(type) { + case events.DeployLogPayload: + return payload.DeployID == deployID + case events.DeployStatusPayload: + return payload.DeployID == deployID + default: + return false + } + }) + defer s.eventBus.Unsubscribe(sub) + + ctx := r.Context() + for { + select { + case <-ctx.Done(): + return + case evt, ok := <-sub: + if !ok { + return + } + writeSSE(w, flusher, evt) + + // Close stream when deploy reaches terminal status. + if evt.Type == events.EventDeployStatus { + if payload, ok := evt.Payload.(events.DeployStatusPayload); ok { + if isTerminalStatus(payload.Status) { + return + } + } + } + } + } +} + +// streamEvents handles GET /api/events. +// It streams instance status changes and deploy status changes via SSE. +func (s *Server) streamEvents(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + if !ok { + respondError(w, http.StatusInternalServerError, "streaming not supported") + return + } + + w.Header().Set("Content-Type", "text/event-stream") + w.Header().Set("Cache-Control", "no-cache") + w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") + w.WriteHeader(http.StatusOK) + flusher.Flush() + + // Subscribe to instance status and deploy status events. + sub := s.eventBus.Subscribe(func(evt events.Event) bool { + return evt.Type == events.EventInstanceStatus || evt.Type == events.EventDeployStatus + }) + defer s.eventBus.Unsubscribe(sub) + + ctx := r.Context() + for { + select { + case <-ctx.Done(): + return + case evt, ok := <-sub: + if !ok { + return + } + writeSSE(w, flusher, evt) + } + } +} + +// writeSSE writes a single SSE event to the response writer and flushes. +func writeSSE(w http.ResponseWriter, flusher http.Flusher, evt events.Event) { + data, err := json.Marshal(evt) + if err != nil { + log.Printf("[sse] marshal event: %v", err) + return + } + fmt.Fprintf(w, "data: %s\n\n", data) + flusher.Flush() +} + +// isTerminalStatus returns true if the deploy status is final. +func isTerminalStatus(status string) bool { + switch status { + case "success", "failed", "rolled_back": + return true + default: + return false + } +} diff --git a/internal/api/static.go b/internal/api/static.go new file mode 100644 index 0000000..ed35efc --- /dev/null +++ b/internal/api/static.go @@ -0,0 +1,42 @@ +package api + +import ( + "io/fs" + "net/http" + "strings" +) + +// StaticHandler serves embedded SPA files with fallback to index.html +// for all non-API routes (SPA client-side routing support). +func StaticHandler(webFS fs.FS) http.Handler { + fileServer := http.FileServer(http.FS(webFS)) + + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + // Skip API routes — they are handled by the API router. + if strings.HasPrefix(r.URL.Path, "/api") { + http.NotFound(w, r) + return + } + + // Try to serve the exact file. + path := strings.TrimPrefix(r.URL.Path, "/") + if path == "" { + path = "index.html" + } + + // Check if file exists in the embedded FS. + f, err := webFS.Open(path) + if err == nil { + f.Close() + // Clear the JSON content-type set by middleware — let file server decide. + w.Header().Del("Content-Type") + fileServer.ServeHTTP(w, r) + return + } + + // File not found: serve index.html for SPA client-side routing. + r.URL.Path = "/" + w.Header().Del("Content-Type") + fileServer.ServeHTTP(w, r) + }) +} diff --git a/internal/deployer/deployer.go b/internal/deployer/deployer.go index acd70a0..3b37260 100644 --- a/internal/deployer/deployer.go +++ b/internal/deployer/deployer.go @@ -9,6 +9,7 @@ import ( "github.com/alexei/docker-watcher/internal/crypto" "github.com/alexei/docker-watcher/internal/docker" + "github.com/alexei/docker-watcher/internal/events" "github.com/alexei/docker-watcher/internal/health" "github.com/alexei/docker-watcher/internal/notify" "github.com/alexei/docker-watcher/internal/npm" @@ -25,9 +26,15 @@ type Deployer struct { store *store.Store health *health.Checker notifier *notify.Notifier + eventBus EventPublisher encKey [32]byte } +// EventPublisher is the interface for publishing events to the event bus. +type EventPublisher interface { + Publish(evt events.Event) +} + // New creates a new Deployer with all required dependencies. func New( dockerClient *docker.Client, @@ -35,6 +42,7 @@ func New( st *store.Store, checker *health.Checker, notifier *notify.Notifier, + eventBus EventPublisher, encKey [32]byte, ) *Deployer { return &Deployer{ @@ -43,6 +51,7 @@ func New( store: st, health: checker, notifier: notifier, + eventBus: eventBus, encKey: encKey, } } @@ -91,6 +100,7 @@ func (d *Deployer) TriggerDeploy(ctx context.Context, projectID, stageID, imageT if deployErr != nil { d.logDeploy(deploy.ID, fmt.Sprintf("Deploy failed: %v", deployErr), "error") + d.publishDeployStatus(deploy.ID, projectID, stageID, imageTag, "failed", deployErr.Error()) d.rollback(ctx, deploy.ID, containerID, npmProxyID, instanceID) d.notifier.Send(settings.NotificationURL, notify.Event{ @@ -108,6 +118,7 @@ func (d *Deployer) TriggerDeploy(ctx context.Context, projectID, stageID, imageT if err := d.store.UpdateDeployStatus(deploy.ID, "success", ""); err != nil { log.Printf("deployer: update deploy status to success: %v", err) } + d.publishDeployStatus(deploy.ID, projectID, stageID, imageTag, "success", "") subdomain := d.buildSubdomain(project, stage, settings, imageTag) fullURL := fmt.Sprintf("https://%s.%s", subdomain, settings.Domain) @@ -144,6 +155,7 @@ func (d *Deployer) executeDeploy( if err := d.store.UpdateDeployStatus(deployID, "pulling", ""); err != nil { log.Printf("deployer: update deploy status: %v", err) } + d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "pulling", "") d.logDeploy(deployID, fmt.Sprintf("Pulling image %s:%s", project.Image, imageTag), "info") authConfig, err := d.buildRegistryAuth(project) @@ -167,6 +179,7 @@ func (d *Deployer) executeDeploy( if err := d.store.UpdateDeployStatus(deployID, "starting", ""); err != nil { log.Printf("deployer: update deploy status: %v", err) } + d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "starting", "") // Pre-generate instance ID so it can be set as a container label. instanceID = uuid.New().String() @@ -224,12 +237,14 @@ func (d *Deployer) executeDeploy( if err := d.store.UpdateInstanceStatus(instanceID, "running"); err != nil { log.Printf("deployer: update instance status to running: %v", err) } + d.publishInstanceStatus(instanceID, project.ID, stage.ID, "running") d.logDeploy(deployID, "Container started", "info") // Step 4: Configure NPM proxy. if err := d.store.UpdateDeployStatus(deployID, "configuring_proxy", ""); err != nil { log.Printf("deployer: update deploy status: %v", err) } + d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "configuring_proxy", "") npmProxyID, err = d.configureProxy(ctx, deployID, settings, containerName, project.Port, subdomain) if err != nil { @@ -248,6 +263,7 @@ func (d *Deployer) executeDeploy( if err := d.store.UpdateDeployStatus(deployID, "health_checking", ""); err != nil { log.Printf("deployer: update deploy status: %v", err) } + d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "health_checking", "") healthURL := fmt.Sprintf("http://%s:%d%s", containerName, project.Port, project.Healthcheck) d.logDeploy(deployID, fmt.Sprintf("Running health check: %s", healthURL), "info") @@ -466,11 +482,54 @@ func (d *Deployer) parseEnvVars(envJSON string) []string { return vars } -// logDeploy appends a log entry for a deploy. Errors are logged to stderr but not propagated. +// logDeploy appends a log entry for a deploy and publishes it on the event bus. +// Errors are logged to stderr but not propagated. func (d *Deployer) logDeploy(deployID, message, level string) { if err := d.store.AppendDeployLog(deployID, message, level); err != nil { log.Printf("deployer: append deploy log: %v", err) } + if d.eventBus != nil { + d.eventBus.Publish(events.Event{ + Type: events.EventDeployLog, + Payload: events.DeployLogPayload{ + DeployID: deployID, + Message: message, + Level: level, + }, + }) + } +} + +// publishDeployStatus publishes a deploy status change event on the bus. +func (d *Deployer) publishDeployStatus(deployID, projectID, stageID, imageTag, status, deployErr string) { + if d.eventBus != nil { + d.eventBus.Publish(events.Event{ + Type: events.EventDeployStatus, + Payload: events.DeployStatusPayload{ + DeployID: deployID, + ProjectID: projectID, + StageID: stageID, + ImageTag: imageTag, + Status: status, + Error: deployErr, + }, + }) + } +} + +// publishInstanceStatus publishes an instance status change event on the bus. +func (d *Deployer) publishInstanceStatus(instanceID, projectID, stageID, status string) { + if d.eventBus != nil { + d.eventBus.Publish(events.Event{ + Type: events.EventInstanceStatus, + Payload: events.InstanceStatusPayload{ + InstanceID: instanceID, + ProjectID: projectID, + StageID: stageID, + Status: status, + }, + }) + } } // truncateID safely truncates a Docker ID to 12 characters for display. diff --git a/internal/events/bus.go b/internal/events/bus.go new file mode 100644 index 0000000..a4097a2 --- /dev/null +++ b/internal/events/bus.go @@ -0,0 +1,121 @@ +package events + +import ( + "encoding/json" + "sync" +) + +// EventType identifies the kind of event being published. +type EventType string + +const ( + // EventDeployLog is emitted when a new deploy log line is appended. + EventDeployLog EventType = "deploy_log" + + // EventInstanceStatus is emitted when an instance status changes. + EventInstanceStatus EventType = "instance_status" + + // EventDeployStatus is emitted when a deploy status changes. + EventDeployStatus EventType = "deploy_status" +) + +// Event is a single event published on the bus. +type Event struct { + Type EventType `json:"type"` + Payload any `json:"payload"` +} + +// DeployLogPayload is the payload for EventDeployLog events. +type DeployLogPayload struct { + DeployID string `json:"deploy_id"` + Message string `json:"message"` + Level string `json:"level"` +} + +// InstanceStatusPayload is the payload for EventInstanceStatus events. +type InstanceStatusPayload struct { + InstanceID string `json:"instance_id"` + ProjectID string `json:"project_id"` + StageID string `json:"stage_id"` + Status string `json:"status"` +} + +// DeployStatusPayload is the payload for EventDeployStatus events. +type DeployStatusPayload struct { + DeployID string `json:"deploy_id"` + ProjectID string `json:"project_id"` + StageID string `json:"stage_id"` + ImageTag string `json:"image_tag"` + Status string `json:"status"` + Error string `json:"error,omitempty"` +} + +// Subscriber is a channel that receives events. +type Subscriber chan Event + +// Bus is a simple in-process pub/sub event bus. +// It supports topic-based filtering and per-subscriber buffering. +type Bus struct { + mu sync.RWMutex + subscribers map[Subscriber]subscriberInfo +} + +type subscriberInfo struct { + filter func(Event) bool +} + +// New creates a new event bus. +func New() *Bus { + return &Bus{ + subscribers: make(map[Subscriber]subscriberInfo), + } +} + +// Subscribe registers a new subscriber with an optional filter. +// If filter is nil, the subscriber receives all events. +// The returned channel is buffered to avoid blocking publishers. +func (b *Bus) Subscribe(filter func(Event) bool) Subscriber { + ch := make(Subscriber, 64) + b.mu.Lock() + b.subscribers[ch] = subscriberInfo{filter: filter} + b.mu.Unlock() + return ch +} + +// Unsubscribe removes a subscriber and closes its channel. +func (b *Bus) Unsubscribe(ch Subscriber) { + b.mu.Lock() + if _, ok := b.subscribers[ch]; ok { + delete(b.subscribers, ch) + close(ch) + } + b.mu.Unlock() +} + +// Publish sends an event to all matching subscribers. +// If a subscriber's buffer is full, the event is dropped for that subscriber +// to avoid blocking the publisher. +func (b *Bus) Publish(evt Event) { + b.mu.RLock() + defer b.mu.RUnlock() + + for ch, info := range b.subscribers { + if info.filter != nil && !info.filter(evt) { + continue + } + // Non-blocking send — drop if subscriber is slow. + select { + case ch <- evt: + default: + } + } +} + +// MarshalEvent serializes an event to a JSON string suitable for SSE data lines. +func MarshalEvent(evt Event) (string, error) { + data, err := json.Marshal(evt) + if err != nil { + return "", err + } + return string(data), nil +} diff --git a/plans/docker-watcher-core/PLAN.md b/plans/docker-watcher-core/PLAN.md index 686df1d..4fbd9fa 100644 --- a/plans/docker-watcher-core/PLAN.md +++ b/plans/docker-watcher-core/PLAN.md @@ -33,7 +33,7 @@ A self-hosted tool that automates Docker container deployment with Nginx Proxy M - [x] Phase 8: REST API Layer [domain: backend] → [subplan](./phase-8-api-layer.md) - [x] Phase 9: SvelteKit Dashboard & Project Views [domain: frontend] → [subplan](./phase-9-dashboard.md) - [x] Phase 10: Quick Deploy & Settings Pages [domain: frontend] → [subplan](./phase-10-settings-deploy.md) -- [ ] Phase 11: Frontend Embed & Real-Time Updates [domain: fullstack] → [subplan](./phase-11-embed-sse.md) +- [x] Phase 11: Frontend Embed & Real-Time Updates [domain: fullstack] → [subplan](./phase-11-embed-sse.md) - [ ] Phase 12: Hardening [domain: backend] → [subplan](./phase-12-hardening.md) - [ ] Phase 13: Frontend Polish & Modern UI [domain: frontend] → [subplan](./phase-13-ui-polish.md) - [ ] Phase 14: Volumes & Environment [domain: fullstack] → [subplan](./phase-14-volumes-env.md) @@ -56,8 +56,8 @@ A self-hosted tool that automates Docker container deployment with Nginx Proxy M | Phase 7: Deployer & Health | backend | ✅ Complete | ✅ Pass w/ fixes | ⏭️ Skip (Big Bang) | ✅ | | Phase 8: API Layer | backend | ✅ Complete | ✅ Pass w/ fixes | ⏭️ Skip (Big Bang) | ✅ | | Phase 9: Dashboard | frontend | ✅ Complete | ⬜ Pending | ⏭️ Skip (Big Bang) | ✅ | -| Phase 10: Settings & Deploy | frontend | ✅ Complete | ⬜ Pending | ⏭️ Skip (Big Bang) | ⬜ | -| Phase 11: Embed & SSE | fullstack | ⬜ Not Started | ⬜ | ⏭️ Skip (Big Bang) | ⬜ | +| Phase 10: Settings & Deploy | frontend | ✅ Complete | ⬜ Pending | ⏭️ Skip (Big Bang) | ✅ | +| Phase 11: Embed & SSE | fullstack | ✅ Complete | ⬜ Pending | ⏭️ Skip (Big Bang) | ⬜ | | Phase 12: Hardening | backend | ⬜ Not Started | ⬜ | ⏭️ Skip (Big Bang) | ⬜ | | Phase 13: UI Polish | frontend | ⬜ Not Started | ⬜ | ⏭️ Skip (Big Bang) | ⬜ | | Phase 14: Volumes & Env | fullstack | ⬜ Not Started | ⬜ | ✅ Required (Final) | ⬜ | @@ -85,6 +85,13 @@ A self-hosted tool that automates Docker container deployment with Nginx Proxy M **Why:** Missing from feature planner phases but present in root PLAN.md Phase 4 **Impact on existing phases:** Phase 14 becomes the final phase (build/tests required). Phase 13 (UI Polish) remains but no longer the final phase for build enforcement. +### Amendment 4 — 2026-03-27 + +**Type:** Modified phase +**What changed:** Updated Phase 12 (Hardening) auth tasks to support two modes: Local auth (username/password in SQLite with bcrypt) and OAuth2/OIDC (Authentik or any OIDC provider with configurable discovery URL). Added auth settings UI, user management, OIDC callback flow. +**Why:** Root PLAN.md was updated to require OAuth2/OIDC support alongside local auth +**Impact on existing phases:** Phase 12 task count increased from 10 to 12. Added new files for auth module and login page. + ## Final Review - [ ] Comprehensive code review diff --git a/plans/docker-watcher-core/phase-11-embed-sse.md b/plans/docker-watcher-core/phase-11-embed-sse.md index ebfb130..90d4f15 100644 --- a/plans/docker-watcher-core/phase-11-embed-sse.md +++ b/plans/docker-watcher-core/phase-11-embed-sse.md @@ -1,6 +1,6 @@ # Phase 11: Frontend Embed & Real-Time Updates -**Status:** ⬜ Not Started +**Status:** Done **Parent plan:** [PLAN.md](./PLAN.md) **Domain:** fullstack @@ -9,26 +9,31 @@ Build SvelteKit to static files, embed into the Go binary with `go:embed`, serve ## Tasks -- [ ] Task 1: Configure SvelteKit static adapter to output to `web/build/` -- [ ] Task 2: Add `//go:embed web/build` directive in Go — serve static files -- [ ] Task 3: Create Go handler for serving embedded SPA — serve index.html for all non-API routes (SPA fallback) -- [ ] Task 4: Implement SSE endpoint for deploy logs — `GET /api/deploys/:id/logs` streams deploy progress in real-time -- [ ] Task 5: Implement SSE endpoint for instance status — `GET /api/events` streams instance status changes -- [ ] Task 6: Create event bus/broadcaster in Go — publish events from deployer, subscribe from SSE handlers -- [ ] Task 7: Frontend: connect to SSE for deploy progress — update deploy log view in real-time -- [ ] Task 8: Frontend: connect to SSE for instance status — update dashboard/project views without refresh -- [ ] Task 9: Handle SSE reconnection in frontend — auto-reconnect with backoff on disconnect -- [ ] Task 10: Build script/Makefile — `make build` builds frontend then Go binary +- [x] Task 1: Configure SvelteKit static adapter to output to `web/build/` (already configured) +- [x] Task 2: Add `//go:embed web/build` directive in Go — `web.go` at project root +- [x] Task 3: Create Go handler for serving embedded SPA — `internal/api/static.go` with SPA fallback +- [x] Task 4: Implement SSE endpoint for deploy logs — `GET /api/deploys/:id/logs` (SSE + JSON fallback) +- [x] Task 5: Implement SSE endpoint for instance status — `GET /api/events` streams instance status changes +- [x] Task 6: Create event bus/broadcaster in Go — `internal/events/bus.go` with pub/sub channels +- [x] Task 7: Frontend: connect to SSE for deploy progress — `connectDeployLogs()` in `web/src/lib/sse.ts` +- [x] Task 8: Frontend: connect to SSE for instance status — global SSE in `+layout.svelte` via store +- [x] Task 9: Handle SSE reconnection in frontend — exponential backoff with jitter in `connectSSE()` +- [x] Task 10: Build script/Makefile — `make build` builds frontend then Go binary ## Files to Modify/Create -- `web/svelte.config.js` — ensure static adapter outputs to `web/build/` +- `web/svelte.config.js` — already configured with static adapter outputting to `web/build/` +- `web.go` — root-level embed directive (`//go:embed web/build`) - `internal/api/static.go` — embedded static file server with SPA fallback - `internal/api/sse.go` — SSE endpoints for deploy logs and instance events - `internal/events/bus.go` — event bus for publishing/subscribing to events - `web/src/lib/sse.ts` — SSE client helper with auto-reconnect -- `web/src/routes/+layout.svelte` — wire up global SSE connection for instance status +- `web/src/lib/stores/instance-status.ts` — Svelte store for real-time instance status +- `web/src/routes/+layout.svelte` — wired up global SSE connection for instance status - `Makefile` — build frontend + backend -- `cmd/server/main.go` — wire embedded static serving and event bus +- `cmd/server/main.go` — wired embedded static serving and event bus +- `internal/api/router.go` — added eventBus to Server, SSE routes +- `internal/api/deploys.go` — removed old JSON stub, replaced by SSE handler +- `internal/deployer/deployer.go` — added event publishing for deploy logs, status, instance status ## Acceptance Criteria - `make build` produces a single Go binary with embedded frontend @@ -43,13 +48,29 @@ Build SvelteKit to static files, embed into the Go binary with `go:embed`, serve - Event bus: simple pub/sub with channels — no external dependency needed - SSE format: `data: {"type": "deploy_log", "payload": {...}}\n\n` - Keep SSE connections lightweight — use context cancellation for cleanup +- WriteTimeout on HTTP server set to 0 to support long-lived SSE connections +- Deploy logs endpoint serves both SSE (Accept: text/event-stream) and JSON (default) ## Review Checklist -- [ ] All tasks completed -- [ ] Single binary serves both API and frontend -- [ ] SSE handles multiple concurrent clients -- [ ] No goroutine leaks on SSE disconnect -- [ ] Build process is reproducible (Makefile) +- [x] All tasks completed +- [x] Single binary serves both API and frontend +- [x] SSE handles multiple concurrent clients (buffered channels, non-blocking publish) +- [x] No goroutine leaks on SSE disconnect (context cancellation + Unsubscribe) +- [x] Build process is reproducible (Makefile) ## Handoff to Next Phase - + +### What was implemented +- **Event bus** (`internal/events/bus.go`): In-process pub/sub with topic filtering, buffered subscriber channels (64 events), non-blocking publish. Supports `EventDeployLog`, `EventInstanceStatus`, and `EventDeployStatus` event types. +- **SSE endpoints**: `GET /api/deploys/{id}/logs` streams deploy logs with JSON fallback; `GET /api/events` streams global instance/deploy status changes. +- **Static file serving**: `web.go` at project root embeds `web/build/`, `internal/api/static.go` serves SPA with fallback. Mounted via chi's `NotFound` handler. +- **Frontend SSE client** (`web/src/lib/sse.ts`): `connectSSE()` with exponential backoff + jitter, `connectDeployLogs()` and `connectGlobalEvents()` convenience functions. +- **Instance status store** (`web/src/lib/stores/instance-status.ts`): Svelte writable store updated by global SSE connection in `+layout.svelte`. +- **Deployer integration**: `deployer.go` now publishes deploy log, deploy status, and instance status events via `EventPublisher` interface. + +### Key integration points for next phase +- `events.Bus` is passed to both `api.NewServer` and `deployer.New` +- `api.NewServer` now requires an `*events.Bus` parameter (6th arg before encKey) +- `deployer.New` now requires an `EventPublisher` parameter (6th arg before encKey) +- HTTP server `WriteTimeout` is 0 to support SSE +- The `web.go` file at project root uses package name `dockerwatcher` (imported as `github.com/alexei/docker-watcher`) diff --git a/plans/docker-watcher-core/phase-12-hardening.md b/plans/docker-watcher-core/phase-12-hardening.md index 2576498..a195218 100644 --- a/plans/docker-watcher-core/phase-12-hardening.md +++ b/plans/docker-watcher-core/phase-12-hardening.md @@ -11,22 +11,31 @@ Production hardening — blue-green deploys, promote flow, dashboard auth, grace - [ ] Task 1: Blue-green deploys — start new container, health check, swap NPM proxy, then stop old container (zero downtime) - [ ] Task 2: Promote flow — enforce `promote_from` for production deploys (only tags running in source stage are eligible) -- [ ] Task 3: Dashboard auth — basic auth or token-based authentication for the web UI -- [ ] Task 4: Auth middleware — protect all /api/* routes except webhook -- [ ] Task 5: Graceful shutdown — handle SIGTERM/SIGINT, drain in-progress deploys, close DB, stop poller -- [ ] Task 6: Structured logging — JSON logs with deploy context (project, stage, tag, instance ID) -- [ ] Task 7: Config export — download current SQLite state as YAML (reverse of seed import) -- [ ] Task 8: Dockerfile — multi-stage build (build frontend + Go, copy to minimal image) -- [ ] Task 9: docker-compose.yml — production-ready compose file with volumes, network, env -- [ ] Task 10: Final wiring review — ensure all services are properly initialized and shut down +- [ ] Task 3: Local auth — username/password stored in SQLite (bcrypt hashed), login endpoint, session token (JWT or cookie) +- [ ] Task 4: OAuth2/OIDC auth — integration with Authentik or any OIDC provider (configurable client ID, client secret, discovery URL) +- [ ] Task 5: Auth settings UI — settings page to choose auth mode (local/OIDC), configure OIDC provider, manage local users +- [ ] Task 6: Auth middleware — protect all /api/* routes except webhook; check session/JWT/OIDC token +- [ ] Task 7: Graceful shutdown — handle SIGTERM/SIGINT, drain in-progress deploys, close DB, stop poller +- [ ] Task 8: Structured logging — JSON logs with deploy context (project, stage, tag, instance ID) +- [ ] Task 9: Config export — download current SQLite state as YAML (reverse of seed import) +- [ ] Task 10: Dockerfile — multi-stage build (build frontend + Go, copy to minimal image) +- [ ] Task 11: docker-compose.yml — production-ready compose file with volumes, network, env +- [ ] Task 12: Final wiring review — ensure all services are properly initialized and shut down ## Files to Modify/Create - `internal/deployer/bluegreen.go` — blue-green deploy strategy - `internal/deployer/promote.go` — promote flow logic -- `internal/api/auth.go` — authentication middleware +- `internal/auth/local.go` — local auth (bcrypt password hashing, session tokens) +- `internal/auth/oidc.go` — OAuth2/OIDC provider integration +- `internal/auth/middleware.go` — auth middleware (session/JWT/OIDC token validation) +- `internal/auth/models.go` — user model, auth settings, session store +- `internal/api/auth.go` — auth API endpoints (login, logout, OIDC callback, user management) - `internal/config/export.go` — config export to YAML - `internal/logging/logger.go` — structured JSON logger -- `cmd/server/main.go` — graceful shutdown, structured logging init +- `internal/store/users.go` — user CRUD, auth settings persistence +- `web/src/routes/login/+page.svelte` — login page +- `web/src/routes/settings/auth/+page.svelte` — auth settings UI +- `cmd/server/main.go` — graceful shutdown, structured logging, auth init - `Dockerfile` — multi-stage build - `docker-compose.yml` — production compose file @@ -41,11 +50,15 @@ Production hardening — blue-green deploys, promote flow, dashboard auth, grace ## Notes - Blue-green: keep old container running until new one passes health check, then swap NPM proxy and stop old -- Auth: start simple (basic auth via env var), can be enhanced later (JWT, OIDC) +- Auth has two modes configurable via settings: + - **Local auth**: username/password in SQLite (bcrypt hashed), JWT session tokens + - **OAuth2/OIDC**: integration with Authentik or any OIDC provider (client ID, secret, discovery URL) +- First launch: create default admin user with configurable password via ADMIN_PASSWORD env var +- OIDC flow: redirect to provider → callback → create/link local user → issue session - SIGTERM handling: use Go's `os/signal` + `context.WithCancel` - Structured logging: use `log/slog` (Go stdlib since 1.21) - Dockerfile: build stage with Node.js + Go, runtime stage with scratch/alpine -- This is the FINAL phase — build and full test suite MUST pass here +- Phase 13 (UI Polish) and Phase 14 (Volumes & Env) follow this phase ## Review Checklist - [ ] All tasks completed diff --git a/web.go b/web.go new file mode 100644 index 0000000..c9f1cd2 --- /dev/null +++ b/web.go @@ -0,0 +1,9 @@ +package dockerwatcher + +import "embed" + +// WebBuildFS holds the embedded SvelteKit static build output. +// The build directory is populated by running `npm run build` in the web/ directory. +// +//go:embed web/build +var WebBuildFS embed.FS diff --git a/web/src/lib/sse.ts b/web/src/lib/sse.ts new file mode 100644 index 0000000..d2e15e0 --- /dev/null +++ b/web/src/lib/sse.ts @@ -0,0 +1,193 @@ +/** + * SSE client helper with auto-reconnect and exponential backoff. + * + * Provides type-safe event handling for Docker Watcher's real-time + * event streams (deploy logs and instance status changes). + */ + +// ── Types ────────────────────────────────────────────────────────── + +export type SSEEventType = 'deploy_log' | 'instance_status' | 'deploy_status'; + +export interface SSEEvent { + type: SSEEventType; + payload: T; +} + +export interface DeployLogPayload { + deploy_id: string; + message: string; + level: 'info' | 'warn' | 'error'; +} + +export interface InstanceStatusPayload { + instance_id: string; + project_id: string; + stage_id: string; + status: string; +} + +export interface DeployStatusPayload { + deploy_id: string; + project_id: string; + stage_id: string; + image_tag: string; + status: string; + error?: string; +} + +type SSEPayload = DeployLogPayload | InstanceStatusPayload | DeployStatusPayload; + +export interface SSEOptions { + /** Called for each SSE event received. */ + onEvent: (event: SSEEvent) => void; + /** Called when the connection is established. */ + onOpen?: () => void; + /** Called when the connection is lost. Receives the retry attempt number. */ + onError?: (attempt: number) => void; + /** Called when reconnection is given up (max retries exceeded). */ + onGiveUp?: () => void; + /** Maximum number of reconnect attempts. 0 = infinite. Default: 0 */ + maxRetries?: number; + /** Initial backoff delay in ms. Default: 1000 */ + initialDelay?: number; + /** Maximum backoff delay in ms. Default: 30000 */ + maxDelay?: number; +} + +// ── SSE Connection ───────────────────────────────────────────────── + +export interface SSEConnection { + /** Close the connection and stop reconnecting. */ + close: () => void; +} + +/** + * Creates an SSE connection to the given URL with auto-reconnect. + * + * Uses exponential backoff with jitter for reconnection. + * Returns an object with a `close` method to cleanly shut down. + */ +export function connectSSE(url: string, options: SSEOptions): SSEConnection { + const { + onEvent, + onOpen, + onError, + onGiveUp, + maxRetries = 0, + initialDelay = 1000, + maxDelay = 30000 + } = options; + + let eventSource: EventSource | null = null; + let retryCount = 0; + let retryTimeout: ReturnType | null = null; + let closed = false; + + function connect(): void { + if (closed) return; + + eventSource = new EventSource(url); + + eventSource.onopen = () => { + retryCount = 0; + onOpen?.(); + }; + + eventSource.onmessage = (messageEvent: MessageEvent) => { + try { + const parsed: SSEEvent = JSON.parse(messageEvent.data); + onEvent(parsed); + } catch { + // Ignore malformed events. + } + }; + + eventSource.onerror = () => { + eventSource?.close(); + eventSource = null; + + if (closed) return; + + retryCount++; + onError?.(retryCount); + + if (maxRetries > 0 && retryCount > maxRetries) { + onGiveUp?.(); + return; + } + + // Exponential backoff with jitter. + const delay = Math.min(initialDelay * Math.pow(2, retryCount - 1), maxDelay); + const jitter = delay * 0.2 * Math.random(); + const totalDelay = delay + jitter; + + retryTimeout = setTimeout(connect, totalDelay); + }; + } + + connect(); + + return { + close() { + closed = true; + if (retryTimeout !== null) { + clearTimeout(retryTimeout); + retryTimeout = null; + } + eventSource?.close(); + eventSource = null; + } + }; +} + +// ── Convenience Factories ────────────────────────────────────────── + +/** + * Connect to deploy log SSE stream for a specific deploy. + * Streams existing logs first, then real-time updates. + */ +export function connectDeployLogs( + deployId: string, + callbacks: { + onLog: (log: DeployLogPayload) => void; + onStatus?: (status: DeployStatusPayload) => void; + onOpen?: () => void; + onError?: (attempt: number) => void; + } +): SSEConnection { + return connectSSE(`/api/deploys/${deployId}/logs`, { + onEvent(event) { + if (event.type === 'deploy_log') { + callbacks.onLog(event.payload as DeployLogPayload); + } else if (event.type === 'deploy_status') { + callbacks.onStatus?.(event.payload as DeployStatusPayload); + } + }, + onOpen: callbacks.onOpen, + onError: callbacks.onError + }); +} + +/** + * Connect to the global events SSE stream. + * Receives instance status changes and deploy status updates. + */ +export function connectGlobalEvents(callbacks: { + onInstanceStatus?: (payload: InstanceStatusPayload) => void; + onDeployStatus?: (payload: DeployStatusPayload) => void; + onOpen?: () => void; + onError?: (attempt: number) => void; +}): SSEConnection { + return connectSSE('/api/events', { + onEvent(event) { + if (event.type === 'instance_status') { + callbacks.onInstanceStatus?.(event.payload as InstanceStatusPayload); + } else if (event.type === 'deploy_status') { + callbacks.onDeployStatus?.(event.payload as DeployStatusPayload); + } + }, + onOpen: callbacks.onOpen, + onError: callbacks.onError + }); +} diff --git a/web/src/lib/stores/instance-status.ts b/web/src/lib/stores/instance-status.ts new file mode 100644 index 0000000..66ad831 --- /dev/null +++ b/web/src/lib/stores/instance-status.ts @@ -0,0 +1,70 @@ +import { writable, get } from 'svelte/store'; +import type { InstanceStatusPayload, DeployStatusPayload } from '$lib/sse'; + +/** + * Global store for real-time instance status updates received via SSE. + * + * Components can subscribe to this store to reactively update when + * instance statuses change without polling. + */ + +interface InstanceStatusState { + /** Map of instance ID to latest status. */ + statuses: Record; + /** Timestamp of last update, useful for triggering reactive refreshes. */ + lastUpdate: number; + /** Latest deploy status events, keyed by deploy ID. */ + deployStatuses: Record; +} + +function createInstanceStatusStore() { + const { subscribe, set, update: storeUpdate } = writable({ + statuses: {}, + lastUpdate: 0, + deployStatuses: {} + }); + + return { + subscribe, + + /** Update an instance's status from an SSE event. */ + update(payload: InstanceStatusPayload) { + storeUpdate((state) => ({ + ...state, + statuses: { + ...state.statuses, + [payload.instance_id]: payload.status + }, + lastUpdate: Date.now() + })); + }, + + /** Record a deploy status change from an SSE event. */ + notifyDeploy(payload: DeployStatusPayload) { + storeUpdate((state) => ({ + ...state, + deployStatuses: { + ...state.deployStatuses, + [payload.deploy_id]: payload + }, + lastUpdate: Date.now() + })); + }, + + /** Get the current status of an instance, or undefined if not tracked. */ + getStatus(instanceId: string): string | undefined { + return get({ subscribe }).statuses[instanceId]; + }, + + /** Reset the store (e.g., on disconnect). */ + reset() { + set({ + statuses: {}, + lastUpdate: 0, + deployStatuses: {} + }); + } + }; +} + +export const instanceStatusStore = createInstanceStatusStore(); diff --git a/web/src/routes/+layout.svelte b/web/src/routes/+layout.svelte index 0590823..917f251 100644 --- a/web/src/routes/+layout.svelte +++ b/web/src/routes/+layout.svelte @@ -1,8 +1,11 @@