91b49cb5ed
- Expand health endpoint to check DB, Docker, and NPM connectivity (FUNC-M4) - Add project_id, stage_id, offset query params to deploys endpoint (FUNC-M5, FUNC-M6) - Add notification_url field to Stage model for per-project overrides (FUNC-M2) - Add NPM Ping method for health checking - Sanitize all internal error messages in API handlers (SEC-M4) - Add audit trail events for admin actions (FUNC-M3) - Add EventLog event type to event bus
135 lines
3.5 KiB
Go
135 lines
3.5 KiB
Go
package events
|
|
|
|
import (
|
|
"encoding/json"
|
|
"sync"
|
|
)
|
|
|
|
// EventType identifies the kind of event being published.
|
|
type EventType string
|
|
|
|
const (
|
|
// EventDeployLog is emitted when a new deploy log line is appended.
|
|
EventDeployLog EventType = "deploy_log"
|
|
|
|
// EventInstanceStatus is emitted when an instance status changes.
|
|
EventInstanceStatus EventType = "instance_status"
|
|
|
|
// EventDeployStatus is emitted when a deploy status changes.
|
|
EventDeployStatus EventType = "deploy_status"
|
|
|
|
// EventLog is emitted for audit trail and operational log entries.
|
|
EventLog EventType = "event_log"
|
|
)
|
|
|
|
// Event is a single event published on the bus.
|
|
type Event struct {
|
|
Type EventType `json:"type"`
|
|
Payload any `json:"payload"`
|
|
}
|
|
|
|
// DeployLogPayload is the payload for EventDeployLog events.
|
|
type DeployLogPayload struct {
|
|
DeployID string `json:"deploy_id"`
|
|
Message string `json:"message"`
|
|
Level string `json:"level"`
|
|
}
|
|
|
|
// InstanceStatusPayload is the payload for EventInstanceStatus events.
|
|
type InstanceStatusPayload struct {
|
|
InstanceID string `json:"instance_id"`
|
|
ProjectID string `json:"project_id"`
|
|
StageID string `json:"stage_id"`
|
|
Status string `json:"status"`
|
|
}
|
|
|
|
// DeployStatusPayload is the payload for EventDeployStatus events.
|
|
type DeployStatusPayload struct {
|
|
DeployID string `json:"deploy_id"`
|
|
ProjectID string `json:"project_id"`
|
|
StageID string `json:"stage_id"`
|
|
ImageTag string `json:"image_tag"`
|
|
Status string `json:"status"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// EventLogPayload is the payload for EventLog events (audit trail).
|
|
type EventLogPayload struct {
|
|
ID int64 `json:"id"`
|
|
Source string `json:"source"`
|
|
Severity string `json:"severity"`
|
|
Message string `json:"message"`
|
|
Metadata string `json:"metadata"`
|
|
CreatedAt string `json:"created_at"`
|
|
}
|
|
|
|
// Subscriber is a channel that receives events.
|
|
type Subscriber chan Event
|
|
|
|
// Bus is a simple in-process pub/sub event bus.
|
|
// It supports topic-based filtering and per-subscriber buffering.
|
|
type Bus struct {
|
|
mu sync.RWMutex
|
|
subscribers map[Subscriber]subscriberInfo
|
|
}
|
|
|
|
type subscriberInfo struct {
|
|
filter func(Event) bool
|
|
}
|
|
|
|
// New creates a new event bus.
|
|
func New() *Bus {
|
|
return &Bus{
|
|
subscribers: make(map[Subscriber]subscriberInfo),
|
|
}
|
|
}
|
|
|
|
// Subscribe registers a new subscriber with an optional filter.
|
|
// If filter is nil, the subscriber receives all events.
|
|
// The returned channel is buffered to avoid blocking publishers.
|
|
func (b *Bus) Subscribe(filter func(Event) bool) Subscriber {
|
|
ch := make(Subscriber, 64)
|
|
b.mu.Lock()
|
|
b.subscribers[ch] = subscriberInfo{filter: filter}
|
|
b.mu.Unlock()
|
|
return ch
|
|
}
|
|
|
|
// Unsubscribe removes a subscriber and closes its channel.
|
|
func (b *Bus) Unsubscribe(ch Subscriber) {
|
|
b.mu.Lock()
|
|
if _, ok := b.subscribers[ch]; ok {
|
|
delete(b.subscribers, ch)
|
|
close(ch)
|
|
}
|
|
b.mu.Unlock()
|
|
}
|
|
|
|
// Publish sends an event to all matching subscribers.
|
|
// If a subscriber's buffer is full, the event is dropped for that subscriber
|
|
// to avoid blocking the publisher.
|
|
func (b *Bus) Publish(evt Event) {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
|
|
for ch, info := range b.subscribers {
|
|
if info.filter != nil && !info.filter(evt) {
|
|
continue
|
|
}
|
|
// Non-blocking send — drop if subscriber is slow.
|
|
select {
|
|
case ch <- evt:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// MarshalEvent serializes an event to a JSON string suitable for SSE data lines.
|
|
func MarshalEvent(evt Event) (string, error) {
|
|
data, err := json.Marshal(evt)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return string(data), nil
|
|
}
|