75424a5f25
Build / build (push) Successful in 10m42s
Adds a new Stacks feature: upload/edit docker-compose YAML, deploy as atomic units, browse revisions, roll back, and stream logs. Backend in internal/stack + internal/api/stacks.go, persistent storage in internal/store/stacks.go. Stacks pages (list, new, detail) use a modern Forge aesthetic — Instrument Serif display type, JetBrains Mono for meta/code, indigo ember accents, dot-grid hero, registration marks on hover, terminal panel for logs. Palette is sourced from the app's existing design tokens so the feature remains consistent with the rest of Tinyforge. Fonts self-hosted via @fontsource/instrument-serif and @fontsource/jetbrains-mono to satisfy the strict CSP.
209 lines
5.7 KiB
Go
209 lines
5.7 KiB
Go
package events
|
|
|
|
import (
|
|
"encoding/json"
|
|
"log/slog"
|
|
"sync"
|
|
)
|
|
|
|
// EventType identifies the kind of event being published.
|
|
type EventType string
|
|
|
|
const (
|
|
// EventDeployLog is emitted when a new deploy log line is appended.
|
|
EventDeployLog EventType = "deploy_log"
|
|
|
|
// EventInstanceStatus is emitted when an instance status changes.
|
|
EventInstanceStatus EventType = "instance_status"
|
|
|
|
// EventDeployStatus is emitted when a deploy status changes.
|
|
EventDeployStatus EventType = "deploy_status"
|
|
|
|
// EventLog is emitted for audit trail and operational log entries.
|
|
EventLog EventType = "event_log"
|
|
|
|
// EventStaticSiteStatus is emitted when a static site status changes.
|
|
EventStaticSiteStatus EventType = "static_site_status"
|
|
|
|
// EventStackStatus is emitted when a compose stack status changes.
|
|
EventStackStatus EventType = "stack_status"
|
|
)
|
|
|
|
// Event is a single event published on the bus.
|
|
type Event struct {
|
|
Type EventType `json:"type"`
|
|
Payload any `json:"payload"`
|
|
}
|
|
|
|
// DeployLogPayload is the payload for EventDeployLog events.
|
|
type DeployLogPayload struct {
|
|
DeployID string `json:"deploy_id"`
|
|
Message string `json:"message"`
|
|
Level string `json:"level"`
|
|
}
|
|
|
|
// InstanceStatusPayload is the payload for EventInstanceStatus events.
|
|
type InstanceStatusPayload struct {
|
|
InstanceID string `json:"instance_id"`
|
|
ProjectID string `json:"project_id"`
|
|
StageID string `json:"stage_id"`
|
|
Status string `json:"status"`
|
|
}
|
|
|
|
// DeployStatusPayload is the payload for EventDeployStatus events.
|
|
type DeployStatusPayload struct {
|
|
DeployID string `json:"deploy_id"`
|
|
ProjectID string `json:"project_id"`
|
|
StageID string `json:"stage_id"`
|
|
ImageTag string `json:"image_tag"`
|
|
Status string `json:"status"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// EventLogPayload is the payload for EventLog events (audit trail).
|
|
type EventLogPayload struct {
|
|
ID int64 `json:"id"`
|
|
Source string `json:"source"`
|
|
Severity string `json:"severity"`
|
|
Message string `json:"message"`
|
|
Metadata string `json:"metadata"`
|
|
CreatedAt string `json:"created_at"`
|
|
}
|
|
|
|
// StaticSiteStatusPayload is the payload for EventStaticSiteStatus events.
|
|
type StaticSiteStatusPayload struct {
|
|
SiteID string `json:"site_id"`
|
|
Name string `json:"name"`
|
|
Status string `json:"status"`
|
|
}
|
|
|
|
// StackStatusPayload is the payload for EventStackStatus events.
|
|
type StackStatusPayload struct {
|
|
StackID string `json:"stack_id"`
|
|
Name string `json:"name"`
|
|
Status string `json:"status"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
// Subscriber is a channel that receives events.
|
|
type Subscriber chan Event
|
|
|
|
// Bus is a simple in-process pub/sub event bus.
|
|
// It supports topic-based filtering and per-subscriber buffering.
|
|
type Bus struct {
|
|
mu sync.RWMutex
|
|
subscribers map[Subscriber]subscriberInfo
|
|
}
|
|
|
|
type subscriberInfo struct {
|
|
filter func(Event) bool
|
|
}
|
|
|
|
// New creates a new event bus.
|
|
func New() *Bus {
|
|
return &Bus{
|
|
subscribers: make(map[Subscriber]subscriberInfo),
|
|
}
|
|
}
|
|
|
|
// Subscribe registers a new subscriber with an optional filter.
|
|
// If filter is nil, the subscriber receives all events.
|
|
// The returned channel is buffered to avoid blocking publishers.
|
|
func (b *Bus) Subscribe(filter func(Event) bool) Subscriber {
|
|
ch := make(Subscriber, 64)
|
|
b.mu.Lock()
|
|
b.subscribers[ch] = subscriberInfo{filter: filter}
|
|
b.mu.Unlock()
|
|
return ch
|
|
}
|
|
|
|
// Unsubscribe removes a subscriber and closes its channel.
|
|
func (b *Bus) Unsubscribe(ch Subscriber) {
|
|
b.mu.Lock()
|
|
if _, ok := b.subscribers[ch]; ok {
|
|
delete(b.subscribers, ch)
|
|
close(ch)
|
|
}
|
|
b.mu.Unlock()
|
|
}
|
|
|
|
// Publish sends an event to all matching subscribers.
|
|
// If a subscriber's buffer is full, the event is dropped for that subscriber
|
|
// to avoid blocking the publisher.
|
|
func (b *Bus) Publish(evt Event) {
|
|
b.mu.RLock()
|
|
defer b.mu.RUnlock()
|
|
|
|
for ch, info := range b.subscribers {
|
|
if info.filter != nil && !info.filter(evt) {
|
|
continue
|
|
}
|
|
// Non-blocking send — drop if subscriber is slow.
|
|
select {
|
|
case ch <- evt:
|
|
default:
|
|
}
|
|
}
|
|
}
|
|
|
|
// PersistFunc is a callback that persists an event log entry.
|
|
// It receives source, severity, message, and metadata (JSON string).
|
|
// It returns the persisted entry's ID and created_at timestamp.
|
|
type PersistFunc func(source, severity, message, metadata string) (int64, string, error)
|
|
|
|
// RegisterPersistentLogger subscribes to the bus and auto-persists warn/error
|
|
// events by calling the provided persist function. It also re-publishes the
|
|
// persisted event as an EventLog so SSE clients receive it in real-time.
|
|
// Call the returned function to unsubscribe.
|
|
func (b *Bus) RegisterPersistentLogger(persist PersistFunc) func() {
|
|
sub := b.Subscribe(func(evt Event) bool {
|
|
if evt.Type != EventDeployLog {
|
|
return false
|
|
}
|
|
p, ok := evt.Payload.(DeployLogPayload)
|
|
if !ok {
|
|
return false
|
|
}
|
|
return p.Level == "warn" || p.Level == "error"
|
|
})
|
|
|
|
go func() {
|
|
for evt := range sub {
|
|
p, ok := evt.Payload.(DeployLogPayload)
|
|
if !ok {
|
|
continue
|
|
}
|
|
metaBytes, _ := json.Marshal(map[string]string{"deploy_id": p.DeployID})
|
|
metadata := string(metaBytes)
|
|
id, createdAt, err := persist("deploy", p.Level, p.Message, metadata)
|
|
if err != nil {
|
|
slog.Error("failed to persist event log", "source", "deploy", "level", p.Level, "error", err)
|
|
continue
|
|
}
|
|
|
|
b.Publish(Event{
|
|
Type: EventLog,
|
|
Payload: EventLogPayload{
|
|
ID: id,
|
|
Source: "deploy",
|
|
Severity: p.Level,
|
|
Message: p.Message,
|
|
Metadata: metadata,
|
|
CreatedAt: createdAt,
|
|
},
|
|
})
|
|
}
|
|
}()
|
|
|
|
return func() { b.Unsubscribe(sub) }
|
|
}
|
|
|
|
// MarshalEvent serializes an event to a JSON string suitable for SSE data lines.
|
|
func MarshalEvent(evt Event) (string, error) {
|
|
data, err := json.Marshal(evt)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return string(data), nil
|
|
}
|