feat(docker-watcher): phase 11 - frontend embed & SSE
Embed SvelteKit static build in Go binary via go:embed. Event bus for pub/sub with deploy log, instance status, and deploy status events. SSE endpoints for real-time streaming. Frontend SSE client with exponential backoff reconnection. Makefile for build pipeline. Update Phase 12 auth plan with OAuth2/OIDC support.
This commit is contained in:
+2
-26
@@ -1,14 +1,11 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"log"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"strings"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"github.com/alexei/docker-watcher/internal/store"
|
||||
)
|
||||
|
||||
@@ -30,29 +27,8 @@ func (s *Server) listDeploys(w http.ResponseWriter, r *http.Request) {
|
||||
respondJSON(w, http.StatusOK, deploys)
|
||||
}
|
||||
|
||||
// getDeployLogs handles GET /api/deploys/{id}/logs.
|
||||
// This is an SSE stub that returns logs as JSON for now.
|
||||
// Real SSE streaming will be implemented in Phase 11.
|
||||
func (s *Server) getDeployLogs(w http.ResponseWriter, r *http.Request) {
|
||||
deployID := chi.URLParam(r, "id")
|
||||
|
||||
// Verify deploy exists.
|
||||
if _, err := s.store.GetDeployByID(deployID); err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondNotFound(w, "deploy")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "failed to get deploy: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
logs, err := s.store.GetDeployLogs(deployID)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusInternalServerError, "failed to get deploy logs: "+err.Error())
|
||||
return
|
||||
}
|
||||
respondJSON(w, http.StatusOK, logs)
|
||||
}
|
||||
// NOTE: getDeployLogs has been replaced by streamDeployLogs in sse.go.
|
||||
// The new handler supports both SSE streaming and JSON fallback via Accept header.
|
||||
|
||||
// inspectRequest is the expected JSON body for POST /api/deploy/inspect.
|
||||
type inspectRequest struct {
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"github.com/alexei/docker-watcher/internal/docker"
|
||||
"github.com/alexei/docker-watcher/internal/events"
|
||||
"github.com/alexei/docker-watcher/internal/store"
|
||||
"github.com/alexei/docker-watcher/internal/webhook"
|
||||
)
|
||||
@@ -14,6 +15,7 @@ type Server struct {
|
||||
docker *docker.Client
|
||||
deployer DeployTriggerer
|
||||
webhook *webhook.Handler
|
||||
eventBus *events.Bus
|
||||
encKey [32]byte
|
||||
}
|
||||
|
||||
@@ -23,6 +25,7 @@ func NewServer(
|
||||
dockerClient *docker.Client,
|
||||
deployer DeployTriggerer,
|
||||
webhookHandler *webhook.Handler,
|
||||
eventBus *events.Bus,
|
||||
encKey [32]byte,
|
||||
) *Server {
|
||||
return &Server{
|
||||
@@ -30,6 +33,7 @@ func NewServer(
|
||||
docker: dockerClient,
|
||||
deployer: deployer,
|
||||
webhook: webhookHandler,
|
||||
eventBus: eventBus,
|
||||
encKey: encKey,
|
||||
}
|
||||
}
|
||||
@@ -73,7 +77,10 @@ func (s *Server) Router() chi.Router {
|
||||
|
||||
// Deploy endpoints.
|
||||
r.Get("/deploys", s.listDeploys)
|
||||
r.Get("/deploys/{id}/logs", s.getDeployLogs)
|
||||
r.Get("/deploys/{id}/logs", s.streamDeployLogs)
|
||||
|
||||
// SSE endpoint for real-time instance status and deploy events.
|
||||
r.Get("/events", s.streamEvents)
|
||||
|
||||
// Quick deploy endpoints.
|
||||
r.Post("/deploy/inspect", s.inspectImage)
|
||||
|
||||
@@ -0,0 +1,192 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
|
||||
"github.com/go-chi/chi/v5"
|
||||
|
||||
"github.com/alexei/docker-watcher/internal/events"
|
||||
"github.com/alexei/docker-watcher/internal/store"
|
||||
)
|
||||
|
||||
// streamDeployLogs handles GET /api/deploys/{id}/logs.
|
||||
// It supports both SSE streaming and JSON fallback based on the Accept header.
|
||||
//
|
||||
// SSE mode (Accept: text/event-stream):
|
||||
//
|
||||
// Streams deploy log events in real-time. Existing logs are sent first,
|
||||
// then new logs are pushed as they arrive via the event bus.
|
||||
//
|
||||
// JSON mode (default):
|
||||
//
|
||||
// Returns all existing deploy logs as a JSON array.
|
||||
func (s *Server) streamDeployLogs(w http.ResponseWriter, r *http.Request) {
|
||||
deployID := chi.URLParam(r, "id")
|
||||
|
||||
// Verify deploy exists.
|
||||
deploy, err := s.store.GetDeployByID(deployID)
|
||||
if err != nil {
|
||||
if errors.Is(err, store.ErrNotFound) {
|
||||
respondNotFound(w, "deploy")
|
||||
return
|
||||
}
|
||||
respondError(w, http.StatusInternalServerError, "failed to get deploy: "+err.Error())
|
||||
return
|
||||
}
|
||||
|
||||
// JSON fallback: return existing logs as array.
|
||||
accept := r.Header.Get("Accept")
|
||||
if !strings.Contains(accept, "text/event-stream") {
|
||||
logs, err := s.store.GetDeployLogs(deployID)
|
||||
if err != nil {
|
||||
respondError(w, http.StatusInternalServerError, "failed to get deploy logs: "+err.Error())
|
||||
return
|
||||
}
|
||||
respondJSON(w, http.StatusOK, logs)
|
||||
return
|
||||
}
|
||||
|
||||
// SSE mode.
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
respondError(w, http.StatusInternalServerError, "streaming not supported")
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("X-Accel-Buffering", "no")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
flusher.Flush()
|
||||
|
||||
// Send existing logs first.
|
||||
existingLogs, err := s.store.GetDeployLogs(deployID)
|
||||
if err != nil {
|
||||
log.Printf("[sse] failed to get existing deploy logs: %v", err)
|
||||
} else {
|
||||
for _, entry := range existingLogs {
|
||||
writeSSE(w, flusher, events.Event{
|
||||
Type: events.EventDeployLog,
|
||||
Payload: events.DeployLogPayload{
|
||||
DeployID: deployID,
|
||||
Message: entry.Message,
|
||||
Level: entry.Level,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// If deploy is already finished, send completion and close.
|
||||
if isTerminalStatus(deploy.Status) {
|
||||
writeSSE(w, flusher, events.Event{
|
||||
Type: events.EventDeployStatus,
|
||||
Payload: events.DeployStatusPayload{
|
||||
DeployID: deployID,
|
||||
ProjectID: deploy.ProjectID,
|
||||
StageID: deploy.StageID,
|
||||
ImageTag: deploy.ImageTag,
|
||||
Status: deploy.Status,
|
||||
Error: deploy.Error,
|
||||
},
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
// Subscribe to new deploy log events for this deploy.
|
||||
sub := s.eventBus.Subscribe(func(evt events.Event) bool {
|
||||
switch payload := evt.Payload.(type) {
|
||||
case events.DeployLogPayload:
|
||||
return payload.DeployID == deployID
|
||||
case events.DeployStatusPayload:
|
||||
return payload.DeployID == deployID
|
||||
default:
|
||||
return false
|
||||
}
|
||||
})
|
||||
defer s.eventBus.Unsubscribe(sub)
|
||||
|
||||
ctx := r.Context()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case evt, ok := <-sub:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
writeSSE(w, flusher, evt)
|
||||
|
||||
// Close stream when deploy reaches terminal status.
|
||||
if evt.Type == events.EventDeployStatus {
|
||||
if payload, ok := evt.Payload.(events.DeployStatusPayload); ok {
|
||||
if isTerminalStatus(payload.Status) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// streamEvents handles GET /api/events.
|
||||
// It streams instance status changes and deploy status changes via SSE.
|
||||
func (s *Server) streamEvents(w http.ResponseWriter, r *http.Request) {
|
||||
flusher, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
respondError(w, http.StatusInternalServerError, "streaming not supported")
|
||||
return
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "text/event-stream")
|
||||
w.Header().Set("Cache-Control", "no-cache")
|
||||
w.Header().Set("Connection", "keep-alive")
|
||||
w.Header().Set("X-Accel-Buffering", "no")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
flusher.Flush()
|
||||
|
||||
// Subscribe to instance status and deploy status events.
|
||||
sub := s.eventBus.Subscribe(func(evt events.Event) bool {
|
||||
return evt.Type == events.EventInstanceStatus || evt.Type == events.EventDeployStatus
|
||||
})
|
||||
defer s.eventBus.Unsubscribe(sub)
|
||||
|
||||
ctx := r.Context()
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case evt, ok := <-sub:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
writeSSE(w, flusher, evt)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// writeSSE writes a single SSE event to the response writer and flushes.
|
||||
func writeSSE(w http.ResponseWriter, flusher http.Flusher, evt events.Event) {
|
||||
data, err := json.Marshal(evt)
|
||||
if err != nil {
|
||||
log.Printf("[sse] marshal event: %v", err)
|
||||
return
|
||||
}
|
||||
fmt.Fprintf(w, "data: %s\n\n", data)
|
||||
flusher.Flush()
|
||||
}
|
||||
|
||||
// isTerminalStatus returns true if the deploy status is final.
|
||||
func isTerminalStatus(status string) bool {
|
||||
switch status {
|
||||
case "success", "failed", "rolled_back":
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,42 @@
|
||||
package api
|
||||
|
||||
import (
|
||||
"io/fs"
|
||||
"net/http"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// StaticHandler serves embedded SPA files with fallback to index.html
|
||||
// for all non-API routes (SPA client-side routing support).
|
||||
func StaticHandler(webFS fs.FS) http.Handler {
|
||||
fileServer := http.FileServer(http.FS(webFS))
|
||||
|
||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||
// Skip API routes — they are handled by the API router.
|
||||
if strings.HasPrefix(r.URL.Path, "/api") {
|
||||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// Try to serve the exact file.
|
||||
path := strings.TrimPrefix(r.URL.Path, "/")
|
||||
if path == "" {
|
||||
path = "index.html"
|
||||
}
|
||||
|
||||
// Check if file exists in the embedded FS.
|
||||
f, err := webFS.Open(path)
|
||||
if err == nil {
|
||||
f.Close()
|
||||
// Clear the JSON content-type set by middleware — let file server decide.
|
||||
w.Header().Del("Content-Type")
|
||||
fileServer.ServeHTTP(w, r)
|
||||
return
|
||||
}
|
||||
|
||||
// File not found: serve index.html for SPA client-side routing.
|
||||
r.URL.Path = "/"
|
||||
w.Header().Del("Content-Type")
|
||||
fileServer.ServeHTTP(w, r)
|
||||
})
|
||||
}
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
"github.com/alexei/docker-watcher/internal/crypto"
|
||||
"github.com/alexei/docker-watcher/internal/docker"
|
||||
"github.com/alexei/docker-watcher/internal/events"
|
||||
"github.com/alexei/docker-watcher/internal/health"
|
||||
"github.com/alexei/docker-watcher/internal/notify"
|
||||
"github.com/alexei/docker-watcher/internal/npm"
|
||||
@@ -25,9 +26,15 @@ type Deployer struct {
|
||||
store *store.Store
|
||||
health *health.Checker
|
||||
notifier *notify.Notifier
|
||||
eventBus EventPublisher
|
||||
encKey [32]byte
|
||||
}
|
||||
|
||||
// EventPublisher is the interface for publishing events to the event bus.
|
||||
type EventPublisher interface {
|
||||
Publish(evt events.Event)
|
||||
}
|
||||
|
||||
// New creates a new Deployer with all required dependencies.
|
||||
func New(
|
||||
dockerClient *docker.Client,
|
||||
@@ -35,6 +42,7 @@ func New(
|
||||
st *store.Store,
|
||||
checker *health.Checker,
|
||||
notifier *notify.Notifier,
|
||||
eventBus EventPublisher,
|
||||
encKey [32]byte,
|
||||
) *Deployer {
|
||||
return &Deployer{
|
||||
@@ -43,6 +51,7 @@ func New(
|
||||
store: st,
|
||||
health: checker,
|
||||
notifier: notifier,
|
||||
eventBus: eventBus,
|
||||
encKey: encKey,
|
||||
}
|
||||
}
|
||||
@@ -91,6 +100,7 @@ func (d *Deployer) TriggerDeploy(ctx context.Context, projectID, stageID, imageT
|
||||
|
||||
if deployErr != nil {
|
||||
d.logDeploy(deploy.ID, fmt.Sprintf("Deploy failed: %v", deployErr), "error")
|
||||
d.publishDeployStatus(deploy.ID, projectID, stageID, imageTag, "failed", deployErr.Error())
|
||||
d.rollback(ctx, deploy.ID, containerID, npmProxyID, instanceID)
|
||||
|
||||
d.notifier.Send(settings.NotificationURL, notify.Event{
|
||||
@@ -108,6 +118,7 @@ func (d *Deployer) TriggerDeploy(ctx context.Context, projectID, stageID, imageT
|
||||
if err := d.store.UpdateDeployStatus(deploy.ID, "success", ""); err != nil {
|
||||
log.Printf("deployer: update deploy status to success: %v", err)
|
||||
}
|
||||
d.publishDeployStatus(deploy.ID, projectID, stageID, imageTag, "success", "")
|
||||
|
||||
subdomain := d.buildSubdomain(project, stage, settings, imageTag)
|
||||
fullURL := fmt.Sprintf("https://%s.%s", subdomain, settings.Domain)
|
||||
@@ -144,6 +155,7 @@ func (d *Deployer) executeDeploy(
|
||||
if err := d.store.UpdateDeployStatus(deployID, "pulling", ""); err != nil {
|
||||
log.Printf("deployer: update deploy status: %v", err)
|
||||
}
|
||||
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "pulling", "")
|
||||
d.logDeploy(deployID, fmt.Sprintf("Pulling image %s:%s", project.Image, imageTag), "info")
|
||||
|
||||
authConfig, err := d.buildRegistryAuth(project)
|
||||
@@ -167,6 +179,7 @@ func (d *Deployer) executeDeploy(
|
||||
if err := d.store.UpdateDeployStatus(deployID, "starting", ""); err != nil {
|
||||
log.Printf("deployer: update deploy status: %v", err)
|
||||
}
|
||||
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "starting", "")
|
||||
|
||||
// Pre-generate instance ID so it can be set as a container label.
|
||||
instanceID = uuid.New().String()
|
||||
@@ -224,12 +237,14 @@ func (d *Deployer) executeDeploy(
|
||||
if err := d.store.UpdateInstanceStatus(instanceID, "running"); err != nil {
|
||||
log.Printf("deployer: update instance status to running: %v", err)
|
||||
}
|
||||
d.publishInstanceStatus(instanceID, project.ID, stage.ID, "running")
|
||||
d.logDeploy(deployID, "Container started", "info")
|
||||
|
||||
// Step 4: Configure NPM proxy.
|
||||
if err := d.store.UpdateDeployStatus(deployID, "configuring_proxy", ""); err != nil {
|
||||
log.Printf("deployer: update deploy status: %v", err)
|
||||
}
|
||||
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "configuring_proxy", "")
|
||||
|
||||
npmProxyID, err = d.configureProxy(ctx, deployID, settings, containerName, project.Port, subdomain)
|
||||
if err != nil {
|
||||
@@ -248,6 +263,7 @@ func (d *Deployer) executeDeploy(
|
||||
if err := d.store.UpdateDeployStatus(deployID, "health_checking", ""); err != nil {
|
||||
log.Printf("deployer: update deploy status: %v", err)
|
||||
}
|
||||
d.publishDeployStatus(deployID, project.ID, stage.ID, imageTag, "health_checking", "")
|
||||
|
||||
healthURL := fmt.Sprintf("http://%s:%d%s", containerName, project.Port, project.Healthcheck)
|
||||
d.logDeploy(deployID, fmt.Sprintf("Running health check: %s", healthURL), "info")
|
||||
@@ -466,11 +482,54 @@ func (d *Deployer) parseEnvVars(envJSON string) []string {
|
||||
return vars
|
||||
}
|
||||
|
||||
// logDeploy appends a log entry for a deploy. Errors are logged to stderr but not propagated.
|
||||
// logDeploy appends a log entry for a deploy and publishes it on the event bus.
|
||||
// Errors are logged to stderr but not propagated.
|
||||
func (d *Deployer) logDeploy(deployID, message, level string) {
|
||||
if err := d.store.AppendDeployLog(deployID, message, level); err != nil {
|
||||
log.Printf("deployer: append deploy log: %v", err)
|
||||
}
|
||||
if d.eventBus != nil {
|
||||
d.eventBus.Publish(events.Event{
|
||||
Type: events.EventDeployLog,
|
||||
Payload: events.DeployLogPayload{
|
||||
DeployID: deployID,
|
||||
Message: message,
|
||||
Level: level,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// publishDeployStatus publishes a deploy status change event on the bus.
|
||||
func (d *Deployer) publishDeployStatus(deployID, projectID, stageID, imageTag, status, deployErr string) {
|
||||
if d.eventBus != nil {
|
||||
d.eventBus.Publish(events.Event{
|
||||
Type: events.EventDeployStatus,
|
||||
Payload: events.DeployStatusPayload{
|
||||
DeployID: deployID,
|
||||
ProjectID: projectID,
|
||||
StageID: stageID,
|
||||
ImageTag: imageTag,
|
||||
Status: status,
|
||||
Error: deployErr,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// publishInstanceStatus publishes an instance status change event on the bus.
|
||||
func (d *Deployer) publishInstanceStatus(instanceID, projectID, stageID, status string) {
|
||||
if d.eventBus != nil {
|
||||
d.eventBus.Publish(events.Event{
|
||||
Type: events.EventInstanceStatus,
|
||||
Payload: events.InstanceStatusPayload{
|
||||
InstanceID: instanceID,
|
||||
ProjectID: projectID,
|
||||
StageID: stageID,
|
||||
Status: status,
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// truncateID safely truncates a Docker ID to 12 characters for display.
|
||||
|
||||
@@ -0,0 +1,121 @@
|
||||
package events
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"sync"
|
||||
)
|
||||
|
||||
// EventType identifies the kind of event being published.
|
||||
type EventType string
|
||||
|
||||
const (
|
||||
// EventDeployLog is emitted when a new deploy log line is appended.
|
||||
EventDeployLog EventType = "deploy_log"
|
||||
|
||||
// EventInstanceStatus is emitted when an instance status changes.
|
||||
EventInstanceStatus EventType = "instance_status"
|
||||
|
||||
// EventDeployStatus is emitted when a deploy status changes.
|
||||
EventDeployStatus EventType = "deploy_status"
|
||||
)
|
||||
|
||||
// Event is a single event published on the bus.
|
||||
type Event struct {
|
||||
Type EventType `json:"type"`
|
||||
Payload any `json:"payload"`
|
||||
}
|
||||
|
||||
// DeployLogPayload is the payload for EventDeployLog events.
|
||||
type DeployLogPayload struct {
|
||||
DeployID string `json:"deploy_id"`
|
||||
Message string `json:"message"`
|
||||
Level string `json:"level"`
|
||||
}
|
||||
|
||||
// InstanceStatusPayload is the payload for EventInstanceStatus events.
|
||||
type InstanceStatusPayload struct {
|
||||
InstanceID string `json:"instance_id"`
|
||||
ProjectID string `json:"project_id"`
|
||||
StageID string `json:"stage_id"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
// DeployStatusPayload is the payload for EventDeployStatus events.
|
||||
type DeployStatusPayload struct {
|
||||
DeployID string `json:"deploy_id"`
|
||||
ProjectID string `json:"project_id"`
|
||||
StageID string `json:"stage_id"`
|
||||
ImageTag string `json:"image_tag"`
|
||||
Status string `json:"status"`
|
||||
Error string `json:"error,omitempty"`
|
||||
}
|
||||
|
||||
// Subscriber is a channel that receives events.
|
||||
type Subscriber chan Event
|
||||
|
||||
// Bus is a simple in-process pub/sub event bus.
|
||||
// It supports topic-based filtering and per-subscriber buffering.
|
||||
type Bus struct {
|
||||
mu sync.RWMutex
|
||||
subscribers map[Subscriber]subscriberInfo
|
||||
}
|
||||
|
||||
type subscriberInfo struct {
|
||||
filter func(Event) bool
|
||||
}
|
||||
|
||||
// New creates a new event bus.
|
||||
func New() *Bus {
|
||||
return &Bus{
|
||||
subscribers: make(map[Subscriber]subscriberInfo),
|
||||
}
|
||||
}
|
||||
|
||||
// Subscribe registers a new subscriber with an optional filter.
|
||||
// If filter is nil, the subscriber receives all events.
|
||||
// The returned channel is buffered to avoid blocking publishers.
|
||||
func (b *Bus) Subscribe(filter func(Event) bool) Subscriber {
|
||||
ch := make(Subscriber, 64)
|
||||
b.mu.Lock()
|
||||
b.subscribers[ch] = subscriberInfo{filter: filter}
|
||||
b.mu.Unlock()
|
||||
return ch
|
||||
}
|
||||
|
||||
// Unsubscribe removes a subscriber and closes its channel.
|
||||
func (b *Bus) Unsubscribe(ch Subscriber) {
|
||||
b.mu.Lock()
|
||||
if _, ok := b.subscribers[ch]; ok {
|
||||
delete(b.subscribers, ch)
|
||||
close(ch)
|
||||
}
|
||||
b.mu.Unlock()
|
||||
}
|
||||
|
||||
// Publish sends an event to all matching subscribers.
|
||||
// If a subscriber's buffer is full, the event is dropped for that subscriber
|
||||
// to avoid blocking the publisher.
|
||||
func (b *Bus) Publish(evt Event) {
|
||||
b.mu.RLock()
|
||||
defer b.mu.RUnlock()
|
||||
|
||||
for ch, info := range b.subscribers {
|
||||
if info.filter != nil && !info.filter(evt) {
|
||||
continue
|
||||
}
|
||||
// Non-blocking send — drop if subscriber is slow.
|
||||
select {
|
||||
case ch <- evt:
|
||||
default:
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// MarshalEvent serializes an event to a JSON string suitable for SSE data lines.
|
||||
func MarshalEvent(evt Event) (string, error) {
|
||||
data, err := json.Marshal(evt)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return string(data), nil
|
||||
}
|
||||
Reference in New Issue
Block a user