package events import ( "encoding/json" "sync" ) // EventType identifies the kind of event being published. type EventType string const ( // EventDeployLog is emitted when a new deploy log line is appended. EventDeployLog EventType = "deploy_log" // EventInstanceStatus is emitted when an instance status changes. EventInstanceStatus EventType = "instance_status" // EventDeployStatus is emitted when a deploy status changes. EventDeployStatus EventType = "deploy_status" ) // Event is a single event published on the bus. type Event struct { Type EventType `json:"type"` Payload any `json:"payload"` } // DeployLogPayload is the payload for EventDeployLog events. type DeployLogPayload struct { DeployID string `json:"deploy_id"` Message string `json:"message"` Level string `json:"level"` } // InstanceStatusPayload is the payload for EventInstanceStatus events. type InstanceStatusPayload struct { InstanceID string `json:"instance_id"` ProjectID string `json:"project_id"` StageID string `json:"stage_id"` Status string `json:"status"` } // DeployStatusPayload is the payload for EventDeployStatus events. type DeployStatusPayload struct { DeployID string `json:"deploy_id"` ProjectID string `json:"project_id"` StageID string `json:"stage_id"` ImageTag string `json:"image_tag"` Status string `json:"status"` Error string `json:"error,omitempty"` } // Subscriber is a channel that receives events. type Subscriber chan Event // Bus is a simple in-process pub/sub event bus. // It supports topic-based filtering and per-subscriber buffering. type Bus struct { mu sync.RWMutex subscribers map[Subscriber]subscriberInfo } type subscriberInfo struct { filter func(Event) bool } // New creates a new event bus. func New() *Bus { return &Bus{ subscribers: make(map[Subscriber]subscriberInfo), } } // Subscribe registers a new subscriber with an optional filter. // If filter is nil, the subscriber receives all events. // The returned channel is buffered to avoid blocking publishers. func (b *Bus) Subscribe(filter func(Event) bool) Subscriber { ch := make(Subscriber, 64) b.mu.Lock() b.subscribers[ch] = subscriberInfo{filter: filter} b.mu.Unlock() return ch } // Unsubscribe removes a subscriber and closes its channel. func (b *Bus) Unsubscribe(ch Subscriber) { b.mu.Lock() if _, ok := b.subscribers[ch]; ok { delete(b.subscribers, ch) close(ch) } b.mu.Unlock() } // Publish sends an event to all matching subscribers. // If a subscriber's buffer is full, the event is dropped for that subscriber // to avoid blocking the publisher. func (b *Bus) Publish(evt Event) { b.mu.RLock() defer b.mu.RUnlock() for ch, info := range b.subscribers { if info.filter != nil && !info.filter(evt) { continue } // Non-blocking send — drop if subscriber is slow. select { case ch <- evt: default: } } } // MarshalEvent serializes an event to a JSON string suitable for SSE data lines. func MarshalEvent(evt Event) (string, error) { data, err := json.Marshal(evt) if err != nil { return "", err } return string(data), nil }