Files
alexei.dolgolyov 93b6911b34 feat(apps): per-app deploy/activity timeline
Every deploy across all four source kinds now writes a workload-scoped
event via a shared plugin.EmitDeployEvent helper (replacing the inline
emit duplicated in static/dockerfile, standardizing static's metadata
key site_id->workload_id, and adding emission to image+compose which
were silent). New indexed event_log.workload_id column, EventLogFilter
.WorkloadID, and GET /api/workloads/{id}/events (id pinned from path).

Frontend: a forge "Activity" panel on /apps/[id] reusing EventLogEntry,
live SSE prepend filtered by workload_id, load-more pagination, an
All/Errors severity filter, and a shared toEventLogEntry mapper. en/ru
i18n parity.

Security: compose's failure status emits a generic reason instead of raw
`docker compose up` output, which can echo app secrets and egresses to
operator webhooks (NotificationURL + event-trigger actions); full detail
stays only in the returned error. Rune-safe 256-rune status cap.

Reviewed: go + typescript APPROVE; security HIGH fixed.
2026-05-29 13:51:17 +03:00

225 lines
6.4 KiB
Go

package events
import (
"encoding/json"
"log/slog"
"sync"
)
// EventType identifies the kind of event being published.
type EventType string
const (
// EventDeployLog is emitted when a new deploy log line is appended.
EventDeployLog EventType = "deploy_log"
// EventInstanceStatus is emitted when an instance status changes.
EventInstanceStatus EventType = "instance_status"
// EventDeployStatus is emitted when a deploy status changes.
EventDeployStatus EventType = "deploy_status"
// EventLog is emitted for audit trail and operational log entries.
EventLog EventType = "event_log"
// EventStaticSiteStatus is emitted when a static site status changes.
EventStaticSiteStatus EventType = "static_site_status"
// EventStackStatus is emitted when a compose stack status changes.
EventStackStatus EventType = "stack_status"
// EventBuildLog is emitted for each line of a streaming image build.
// Per-line events are ephemeral (not persisted to the event_log) — they
// exist to drive a live tail UI during the slow "building" phase of a
// dockerfile-source deploy. Subscribers should filter by WorkloadID
// because every dockerfile deploy on the box publishes on the same bus.
EventBuildLog EventType = "build_log"
)
// Event is a single event published on the bus.
type Event struct {
Type EventType `json:"type"`
Payload any `json:"payload"`
}
// DeployLogPayload is the payload for EventDeployLog events.
type DeployLogPayload struct {
DeployID string `json:"deploy_id"`
Message string `json:"message"`
Level string `json:"level"`
}
// InstanceStatusPayload is the payload for EventInstanceStatus events.
type InstanceStatusPayload struct {
InstanceID string `json:"instance_id"`
ProjectID string `json:"project_id"`
StageID string `json:"stage_id"`
Status string `json:"status"`
}
// DeployStatusPayload is the payload for EventDeployStatus events.
type DeployStatusPayload struct {
DeployID string `json:"deploy_id"`
ProjectID string `json:"project_id"`
StageID string `json:"stage_id"`
ImageTag string `json:"image_tag"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
}
// EventLogPayload is the payload for EventLog events (audit trail).
type EventLogPayload struct {
ID int64 `json:"id"`
Source string `json:"source"`
WorkloadID string `json:"workload_id"`
Severity string `json:"severity"`
Message string `json:"message"`
Metadata string `json:"metadata"`
CreatedAt string `json:"created_at"`
}
// StaticSiteStatusPayload is the payload for EventStaticSiteStatus events.
type StaticSiteStatusPayload struct {
SiteID string `json:"site_id"`
Name string `json:"name"`
Status string `json:"status"`
}
// BuildLogPayload is the payload for EventBuildLog events. One event
// per non-empty line read off the daemon's NDJSON build stream.
type BuildLogPayload struct {
WorkloadID string `json:"workload_id"`
Line string `json:"line"`
Stream string `json:"stream,omitempty"`
}
// StackStatusPayload is the payload for EventStackStatus events.
type StackStatusPayload struct {
StackID string `json:"stack_id"`
Name string `json:"name"`
Status string `json:"status"`
Error string `json:"error,omitempty"`
}
// Subscriber is a channel that receives events.
type Subscriber chan Event
// Bus is a simple in-process pub/sub event bus.
// It supports topic-based filtering and per-subscriber buffering.
type Bus struct {
mu sync.RWMutex
subscribers map[Subscriber]subscriberInfo
}
type subscriberInfo struct {
filter func(Event) bool
}
// New creates a new event bus.
func New() *Bus {
return &Bus{
subscribers: make(map[Subscriber]subscriberInfo),
}
}
// Subscribe registers a new subscriber with an optional filter.
// If filter is nil, the subscriber receives all events.
// The returned channel is buffered to avoid blocking publishers.
func (b *Bus) Subscribe(filter func(Event) bool) Subscriber {
ch := make(Subscriber, 64)
b.mu.Lock()
b.subscribers[ch] = subscriberInfo{filter: filter}
b.mu.Unlock()
return ch
}
// Unsubscribe removes a subscriber and closes its channel.
func (b *Bus) Unsubscribe(ch Subscriber) {
b.mu.Lock()
if _, ok := b.subscribers[ch]; ok {
delete(b.subscribers, ch)
close(ch)
}
b.mu.Unlock()
}
// Publish sends an event to all matching subscribers.
// If a subscriber's buffer is full, the event is dropped for that subscriber
// to avoid blocking the publisher.
func (b *Bus) Publish(evt Event) {
b.mu.RLock()
defer b.mu.RUnlock()
for ch, info := range b.subscribers {
if info.filter != nil && !info.filter(evt) {
continue
}
// Non-blocking send — drop if subscriber is slow.
select {
case ch <- evt:
default:
}
}
}
// PersistFunc is a callback that persists an event log entry.
// It receives source, severity, message, and metadata (JSON string).
// It returns the persisted entry's ID and created_at timestamp.
type PersistFunc func(source, severity, message, metadata string) (int64, string, error)
// RegisterPersistentLogger subscribes to the bus and auto-persists warn/error
// events by calling the provided persist function. It also re-publishes the
// persisted event as an EventLog so SSE clients receive it in real-time.
// Call the returned function to unsubscribe.
func (b *Bus) RegisterPersistentLogger(persist PersistFunc) func() {
sub := b.Subscribe(func(evt Event) bool {
if evt.Type != EventDeployLog {
return false
}
p, ok := evt.Payload.(DeployLogPayload)
if !ok {
return false
}
return p.Level == "warn" || p.Level == "error"
})
go func() {
for evt := range sub {
p, ok := evt.Payload.(DeployLogPayload)
if !ok {
continue
}
metaBytes, _ := json.Marshal(map[string]string{"deploy_id": p.DeployID})
metadata := string(metaBytes)
id, createdAt, err := persist("deploy", p.Level, p.Message, metadata)
if err != nil {
slog.Error("failed to persist event log", "source", "deploy", "level", p.Level, "error", err)
continue
}
b.Publish(Event{
Type: EventLog,
Payload: EventLogPayload{
ID: id,
Source: "deploy",
Severity: p.Level,
Message: p.Message,
Metadata: metadata,
CreatedAt: createdAt,
},
})
}
}()
return func() { b.Unsubscribe(sub) }
}
// MarshalEvent serializes an event to a JSON string suitable for SSE data lines.
func MarshalEvent(evt Event) (string, error) {
data, err := json.Marshal(evt)
if err != nil {
return "", err
}
return string(data), nil
}