package api import ( "encoding/json" "fmt" "log/slog" "net/http" "time" "github.com/alexei/tinyforge/internal/events" ) // streamEvents handles GET /api/events. // It streams instance status changes and deploy status changes via SSE. func (s *Server) streamEvents(w http.ResponseWriter, r *http.Request) { release, ok := acquireSSESlot(w, s.sseGate) if !ok { return } defer release() flusher, ok := w.(http.Flusher) if !ok { respondError(w, http.StatusInternalServerError, "streaming not supported") return } w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") w.Header().Set("X-Accel-Buffering", "no") w.WriteHeader(http.StatusOK) flusher.Flush() // Subscribe to instance status, deploy status, and persistent event log events. sub := s.eventBus.Subscribe(func(evt events.Event) bool { return evt.Type == events.EventInstanceStatus || evt.Type == events.EventDeployStatus || evt.Type == events.EventLog }) defer s.eventBus.Unsubscribe(sub) // Periodic heartbeat so the browser detects dead connections. heartbeat := time.NewTicker(30 * time.Second) defer heartbeat.Stop() ctx := r.Context() for { select { case <-ctx.Done(): return case evt, ok := <-sub: if !ok { return } writeSSE(w, flusher, evt) case <-heartbeat.C: // SSE comment line — keeps the connection alive without triggering onmessage. fmt.Fprintf(w, ": heartbeat\n\n") flusher.Flush() } } } // writeSSE writes a single SSE event to the response writer and flushes. func writeSSE(w http.ResponseWriter, flusher http.Flusher, evt events.Event) { data, err := json.Marshal(evt) if err != nil { slog.Error("marshal SSE event", "error", err) return } fmt.Fprintf(w, "data: %s\n\n", data) flusher.Flush() }