package api import ( "encoding/json" "errors" "fmt" "log/slog" "net/http" "strings" "time" "github.com/go-chi/chi/v5" "github.com/alexei/tinyforge/internal/events" "github.com/alexei/tinyforge/internal/store" ) // streamDeployLogs handles GET /api/deploys/{id}/logs. // It supports both SSE streaming and JSON fallback based on the Accept header. // // SSE mode (Accept: text/event-stream): // // Streams deploy log events in real-time. Existing logs are sent first, // then new logs are pushed as they arrive via the event bus. // // JSON mode (default): // // Returns all existing deploy logs as a JSON array. func (s *Server) streamDeployLogs(w http.ResponseWriter, r *http.Request) { deployID := chi.URLParam(r, "id") // Verify deploy exists. deploy, err := s.store.GetDeployByID(deployID) if err != nil { if errors.Is(err, store.ErrNotFound) { respondNotFound(w, "deploy") return } slog.Error("failed to get deploy", "error", err) respondError(w, http.StatusInternalServerError, "internal server error") return } // JSON fallback: return existing logs as array. accept := r.Header.Get("Accept") if !strings.Contains(accept, "text/event-stream") { logs, err := s.store.GetDeployLogs(deployID) if err != nil { slog.Error("failed to get deploy logs", "error", err) respondError(w, http.StatusInternalServerError, "internal server error") return } respondJSON(w, http.StatusOK, logs) return } // SSE mode. flusher, ok := w.(http.Flusher) if !ok { respondError(w, http.StatusInternalServerError, "streaming not supported") return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") w.WriteHeader(http.StatusOK) flusher.Flush() // Send existing logs first. existingLogs, err := s.store.GetDeployLogs(deployID) if err != nil { slog.Error("get existing deploy logs", "error", err) } else { for _, entry := range existingLogs { writeSSE(w, flusher, events.Event{ Type: events.EventDeployLog, Payload: events.DeployLogPayload{ DeployID: deployID, Message: entry.Message, Level: entry.Level, }, }) } } // If deploy is already finished, send completion and close. if isTerminalStatus(deploy.Status) { writeSSE(w, flusher, events.Event{ Type: events.EventDeployStatus, Payload: events.DeployStatusPayload{ DeployID: deployID, ProjectID: deploy.ProjectID, StageID: deploy.StageID, ImageTag: deploy.ImageTag, Status: deploy.Status, Error: deploy.Error, }, }) return } // Subscribe to new deploy log events for this deploy. sub := s.eventBus.Subscribe(func(evt events.Event) bool { switch payload := evt.Payload.(type) { case events.DeployLogPayload: return payload.DeployID == deployID case events.DeployStatusPayload: return payload.DeployID == deployID default: return false } }) defer s.eventBus.Unsubscribe(sub) ctx := r.Context() for { select { case <-ctx.Done(): return case evt, ok := <-sub: if !ok { return } writeSSE(w, flusher, evt) // Close stream when deploy reaches terminal status. if evt.Type == events.EventDeployStatus { if payload, ok := evt.Payload.(events.DeployStatusPayload); ok { if isTerminalStatus(payload.Status) { return } } } } } } // streamEvents handles GET /api/events. // It streams instance status changes and deploy status changes via SSE. func (s *Server) streamEvents(w http.ResponseWriter, r *http.Request) { flusher, ok := w.(http.Flusher) if !ok { respondError(w, http.StatusInternalServerError, "streaming not supported") return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") w.WriteHeader(http.StatusOK) flusher.Flush() // Subscribe to instance status, deploy status, and persistent event log events. sub := s.eventBus.Subscribe(func(evt events.Event) bool { return evt.Type == events.EventInstanceStatus || evt.Type == events.EventDeployStatus || evt.Type == events.EventLog }) defer s.eventBus.Unsubscribe(sub) // Periodic heartbeat so the browser detects dead connections. heartbeat := time.NewTicker(30 * time.Second) defer heartbeat.Stop() ctx := r.Context() for { select { case <-ctx.Done(): return case evt, ok := <-sub: if !ok { return } writeSSE(w, flusher, evt) case <-heartbeat.C: // SSE comment line — keeps the connection alive without triggering onmessage. fmt.Fprintf(w, ": heartbeat\n\n") flusher.Flush() } } } // writeSSE writes a single SSE event to the response writer and flushes. func writeSSE(w http.ResponseWriter, flusher http.Flusher, evt events.Event) { data, err := json.Marshal(evt) if err != nil { slog.Error("marshal SSE event", "error", err) return } fmt.Fprintf(w, "data: %s\n\n", data) flusher.Flush() } // isTerminalStatus returns true if the deploy status is final. func isTerminalStatus(status string) bool { return store.IsTerminalDeployStatus(status) }