feat(apps): per-app deploy/activity timeline

Every deploy across all four source kinds now writes a workload-scoped
event via a shared plugin.EmitDeployEvent helper (replacing the inline
emit duplicated in static/dockerfile, standardizing static's metadata
key site_id->workload_id, and adding emission to image+compose which
were silent). New indexed event_log.workload_id column, EventLogFilter
.WorkloadID, and GET /api/workloads/{id}/events (id pinned from path).

Frontend: a forge "Activity" panel on /apps/[id] reusing EventLogEntry,
live SSE prepend filtered by workload_id, load-more pagination, an
All/Errors severity filter, and a shared toEventLogEntry mapper. en/ru
i18n parity.

Security: compose's failure status emits a generic reason instead of raw
`docker compose up` output, which can echo app secrets and egresses to
operator webhooks (NotificationURL + event-trigger actions); full detail
stays only in the returned error. Rune-safe 256-rune status cap.

Reviewed: go + typescript APPROVE; security HIGH fixed.
This commit is contained in:
2026-05-29 13:51:17 +03:00
parent 3071cda512
commit 93b6911b34
19 changed files with 814 additions and 223 deletions
+103
View File
@@ -0,0 +1,103 @@
package plugin
import (
"encoding/json"
"fmt"
"log/slog"
"strings"
"github.com/alexei/tinyforge/internal/events"
"github.com/alexei/tinyforge/internal/store"
)
// maxDeployStatusRunes bounds the persisted status. This is a defense-in-depth
// BACKSTOP, not a sanitizer.
//
// CALLER CONTRACT: deploy events are persisted indefinitely, rendered in the
// per-app timeline, AND egress off-box — error-severity events are forwarded
// to the global NotificationURL (cmd/server) and to operator-configured
// event-trigger webhooks (internal/events/dispatcher). Callers MUST therefore
// keep secrets and raw subprocess output (e.g. `docker compose` combined
// stderr, which can echo the deployed app's own secret-bearing logs) OUT of
// `status`; emit a curated, secret-free reason and keep verbose detail only in
// the returned error (server logs + admin deploy result, neither of which
// egresses). The cap below merely bounds blast radius if something slips
// through — 256 runes keeps a meaningful reason without letting a status
// become an unbounded sink.
const maxDeployStatusRunes = 256
// capDeployStatus truncates s to maxDeployStatusRunes runes, appending an
// ellipsis when it had to cut. Operating on the rune slice keeps the cut on
// a UTF-8 boundary so multibyte output can't be sliced mid-rune.
func capDeployStatus(s string) string {
runes := []rune(s)
if len(runes) <= maxDeployStatusRunes {
return s
}
return string(runes[:maxDeployStatusRunes]) + "…"
}
// EmitDeployEvent records a workload-scoped deploy event in the event log
// and publishes it on the bus. Best-effort: logs and returns on failure,
// never blocks or fails the deploy. `source` is the per-kind event source
// string ("image","compose","static_site","dockerfile"); `status` is a
// short human status ("deploying","deployed","failed: <reason>").
//
// The metadata always carries workload_id so the per-app activity timeline
// can be reconstructed even by consumers that only read the JSON blob, and
// the dedicated workload_id column powers the indexed per-workload query.
func EmitDeployEvent(deps Deps, w Workload, source, status string) {
// Audit logging is best-effort and must never crash a real deploy. The
// production Deps always wires both, but guard so a missing bus/store
// (e.g. a narrow unit test) degrades to a no-op instead of a panic.
if deps.Store == nil || deps.Events == nil {
return
}
// Derive severity from the raw status prefix BEFORE capping, then bound
// the status that actually gets persisted/displayed/published.
severity := "info"
if strings.HasPrefix(status, "failed") {
severity = "error"
}
status = capDeployStatus(status)
message := fmt.Sprintf("%s: %s", w.Name, status)
metaBytes, err := json.Marshal(map[string]string{
"workload_id": w.ID,
"workload_name": w.Name,
"status": status,
})
if err != nil {
slog.Error("plugin: marshal deploy event metadata",
"source", source, "workload", w.ID, "error", err)
metaBytes = []byte("{}")
}
metadata := string(metaBytes)
evt, err := deps.Store.InsertEvent(store.EventLog{
Source: source,
Severity: severity,
Message: message,
Metadata: metadata,
WorkloadID: w.ID,
})
if err != nil {
slog.Error("plugin: failed to persist deploy event log",
"source", source, "workload", w.ID, "error", err)
return
}
deps.Events.Publish(events.Event{
Type: events.EventLog,
Payload: events.EventLogPayload{
ID: evt.ID,
Source: source,
WorkloadID: w.ID,
Severity: severity,
Message: message,
Metadata: metadata,
CreatedAt: evt.CreatedAt,
},
})
}
+167
View File
@@ -0,0 +1,167 @@
package plugin
import (
"encoding/json"
"strings"
"testing"
"unicode/utf8"
"github.com/alexei/tinyforge/internal/events"
"github.com/alexei/tinyforge/internal/store"
)
// capturePublisher records every event published on it so a test can
// assert on the bus payload. Satisfies plugin.EventPublisher.
type capturePublisher struct {
events []events.Event
}
func (c *capturePublisher) Publish(evt events.Event) {
c.events = append(c.events, evt)
}
// newEmitDeps builds a plugin.Deps backed by an in-memory store and a
// capturing publisher. Mirrors the in-memory store pattern used by the
// store + source-plugin tests.
func newEmitDeps(t *testing.T) (Deps, *capturePublisher) {
t.Helper()
st, err := store.New(":memory:")
if err != nil {
t.Fatalf("open store: %v", err)
}
t.Cleanup(func() { _ = st.Close() })
pub := &capturePublisher{}
return Deps{Store: st, Events: pub}, pub
}
func TestEmitDeployEvent(t *testing.T) {
tests := []struct {
name string
status string
wantSeverity string
}{
{name: "deployed is info", status: "deployed", wantSeverity: "info"},
{name: "deploying is info", status: "deploying", wantSeverity: "info"},
{name: "failed is error", status: "failed: pull foo failed", wantSeverity: "error"},
{name: "failed bare is error", status: "failed", wantSeverity: "error"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
deps, pub := newEmitDeps(t)
w := Workload{ID: "wl-123", Name: "my-app"}
EmitDeployEvent(deps, w, "image", tt.status)
// Persisted row carries the workload scope + derived severity.
rows, err := deps.Store.ListEvents(store.EventLogFilter{WorkloadID: w.ID})
if err != nil {
t.Fatalf("ListEvents: %v", err)
}
if len(rows) != 1 {
t.Fatalf("got %d persisted events, want 1", len(rows))
}
got := rows[0]
if got.Severity != tt.wantSeverity {
t.Errorf("severity = %q, want %q", got.Severity, tt.wantSeverity)
}
if got.Source != "image" {
t.Errorf("source = %q, want %q", got.Source, "image")
}
if got.WorkloadID != w.ID {
t.Errorf("workload_id = %q, want %q", got.WorkloadID, w.ID)
}
wantMsg := w.Name + ": " + tt.status
if got.Message != wantMsg {
t.Errorf("message = %q, want %q", got.Message, wantMsg)
}
// Metadata JSON carries workload_id / workload_name / status.
var meta map[string]string
if err := json.Unmarshal([]byte(got.Metadata), &meta); err != nil {
t.Fatalf("unmarshal metadata %q: %v", got.Metadata, err)
}
if meta["workload_id"] != w.ID {
t.Errorf("metadata workload_id = %q, want %q", meta["workload_id"], w.ID)
}
if meta["workload_name"] != w.Name {
t.Errorf("metadata workload_name = %q, want %q", meta["workload_name"], w.Name)
}
if meta["status"] != tt.status {
t.Errorf("metadata status = %q, want %q", meta["status"], tt.status)
}
// The persisted row is also re-published on the bus as an
// EventLog so SSE clients see it live.
if len(pub.events) != 1 {
t.Fatalf("got %d published events, want 1", len(pub.events))
}
ev := pub.events[0]
if ev.Type != events.EventLog {
t.Errorf("event type = %q, want %q", ev.Type, events.EventLog)
}
payload, ok := ev.Payload.(events.EventLogPayload)
if !ok {
t.Fatalf("payload type = %T, want events.EventLogPayload", ev.Payload)
}
if payload.WorkloadID != w.ID {
t.Errorf("payload workload_id = %q, want %q", payload.WorkloadID, w.ID)
}
if payload.Severity != tt.wantSeverity {
t.Errorf("payload severity = %q, want %q", payload.Severity, tt.wantSeverity)
}
if payload.ID != got.ID {
t.Errorf("payload id = %d, want %d", payload.ID, got.ID)
}
})
}
}
// TestEmitDeployEvent_CapsLongStatus verifies a long failure status (e.g. one
// embedding raw subprocess output) is bounded to maxDeployStatusRunes runes in
// both the persisted message and metadata, cut on a UTF-8 boundary, while
// severity is still derived from the original "failed" prefix.
func TestEmitDeployEvent_CapsLongStatus(t *testing.T) {
deps, pub := newEmitDeps(t)
w := Workload{ID: "wl-cap", Name: "app"}
// Multibyte body so a naive byte-slice would corrupt a rune; prefix with
// "failed: " so the severity check exercises the pre-cap derivation.
longStatus := "failed: " + strings.Repeat("é", 400)
EmitDeployEvent(deps, w, "compose", longStatus)
rows, err := deps.Store.ListEvents(store.EventLogFilter{WorkloadID: w.ID})
if err != nil {
t.Fatalf("ListEvents: %v", err)
}
if len(rows) != 1 {
t.Fatalf("got %d events, want 1", len(rows))
}
got := rows[0]
if got.Severity != "error" {
t.Errorf("severity = %q, want error (derived from pre-cap prefix)", got.Severity)
}
var meta map[string]string
if err := json.Unmarshal([]byte(got.Metadata), &meta); err != nil {
t.Fatalf("unmarshal metadata: %v", err)
}
capped := meta["status"]
if rc := len([]rune(capped)); rc != maxDeployStatusRunes+1 { // +1 for the ellipsis rune
t.Errorf("capped status = %d runes, want %d", rc, maxDeployStatusRunes+1)
}
if !utf8.ValidString(capped) {
t.Errorf("capped status is not valid UTF-8: %q", capped)
}
if !strings.HasSuffix(capped, "…") {
t.Errorf("capped status missing ellipsis suffix: %q", capped)
}
wantMsg := w.Name + ": " + capped
if got.Message != wantMsg {
t.Errorf("message = %q, want %q", got.Message, wantMsg)
}
if len(pub.events) != 1 {
t.Fatalf("got %d published events, want 1", len(pub.events))
}
}
@@ -84,7 +84,7 @@ func (*source) Validate(cfg json.RawMessage) error {
// `docker compose -p <project> up -d`, then syncs one Container row per
// service. The workload ID is the natural compose project name unless
// the user supplied one explicitly.
func (*source) Deploy(ctx context.Context, deps plugin.Deps, w plugin.Workload, intent plugin.DeploymentIntent) error {
func (*source) Deploy(ctx context.Context, deps plugin.Deps, w plugin.Workload, intent plugin.DeploymentIntent) (err error) {
cfg, err := plugin.SourceConfigOf[Config](w)
if err != nil {
return fmt.Errorf("compose source: decode config: %w", err)
@@ -93,6 +93,29 @@ func (*source) Deploy(ctx context.Context, deps plugin.Deps, w plugin.Workload,
return fmt.Errorf("compose source: workload %s has empty compose_yaml", w.ID)
}
// compose.Deploy has no idempotency short-circuit (no "already up"
// fast path that returns nil), so every call past config validation
// is a real deploy. Arm the terminal audit emit here — after pure
// config-validation errors above (kept quiet, mirroring the image
// plugin) but before any real work — so all real failures and the
// success are captured for the per-app timeline. err is the named
// return.
defer func() {
if err != nil {
// SECURITY: the compose.Up failure wraps raw `docker compose`
// combined output (which can include the deployed app's own
// stderr — potentially secrets). Deploy events are persisted
// indefinitely AND egress to operator webhooks (the global
// NotificationURL + event-trigger actions), so the emitted
// status must NOT carry that output. The full detail still
// reaches the server log + admin deploy result via the returned
// err; the timeline records only a generic, secret-free reason.
plugin.EmitDeployEvent(deps, w, "compose", "failed")
} else {
plugin.EmitDeployEvent(deps, w, "compose", "deployed")
}
}()
projectName := composeProjectName(cfg.ComposeProjectName, w)
yamlPath, err := writeYAML(w.ID, cfg.ComposeYAML)
if err != nil {
@@ -2,7 +2,6 @@ package dockerfile
import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
@@ -506,49 +505,13 @@ func dispatchBuildNotification(deps plugin.Deps, w plugin.Workload, domain, stat
})
}
// publishEvent emits a status event on the bus AND persists an
// event_log row. Message shape mirrors the static plugin
// ("Build %q: %s") so the dashboard's audit feed reads consistently
// across both kinds.
// publishEvent records a workload-scoped deploy event in the audit log.
// The InsertEvent + bus publish (and consistent message/metadata shape
// across source kinds) is centralised in plugin.EmitDeployEvent so the
// dashboard's audit feed and the per-workload timeline read identically
// for image / compose / static / dockerfile deploys.
func publishEvent(deps plugin.Deps, w plugin.Workload, status string) {
severity := "info"
if strings.HasPrefix(status, "failed") {
severity = "error"
}
message := fmt.Sprintf("Build %q: %s", w.Name, status)
metaBytes, err := json.Marshal(map[string]string{
"workload_id": w.ID,
"workload_name": w.Name,
"status": status,
})
if err != nil {
slog.Error("dockerfile: marshal event metadata", "error", err)
metaBytes = []byte("{}")
}
metadata := string(metaBytes)
evt, err := deps.Store.InsertEvent(store.EventLog{
Source: "dockerfile",
Severity: severity,
Message: message,
Metadata: metadata,
})
if err != nil {
slog.Error("dockerfile: failed to persist event log", "error", err)
return
}
deps.Events.Publish(events.Event{
Type: events.EventLog,
Payload: events.EventLogPayload{
ID: evt.ID,
Source: "dockerfile",
Severity: severity,
Message: message,
Metadata: metadata,
CreatedAt: evt.CreatedAt,
},
})
plugin.EmitDeployEvent(deps, w, "dockerfile", status)
}
// publishBuildLog emits one EventBuildLog per non-empty daemon "stream"
+30 -32
View File
@@ -118,7 +118,7 @@ func (*source) Validate(cfg json.RawMessage) error {
//
// Any failure between create and face-registration rolls back the new
// container + its row; old serving state is preserved.
func (*source) Deploy(ctx context.Context, deps plugin.Deps, w plugin.Workload, intent plugin.DeploymentIntent) error {
func (*source) Deploy(ctx context.Context, deps plugin.Deps, w plugin.Workload, intent plugin.DeploymentIntent) (err error) {
cfg, err := plugin.SourceConfigOf[Config](w)
if err != nil {
return fmt.Errorf("image source: decode config: %w", err)
@@ -162,6 +162,19 @@ func (*source) Deploy(ctx context.Context, deps plugin.Deps, w plugin.Workload,
}
}
// Past the idempotency short-circuit: this is a real deploy. Emit a
// terminal audit event for the per-app timeline. Armed here (not at the
// top) so duplicate-webhook no-ops above don't flood the log, and
// pre-flight config/settings errors above stay quiet. err is the named
// return, so the deferred closure observes the final outcome.
defer func() {
if err != nil {
plugin.EmitDeployEvent(deps, w, "image", "failed: "+err.Error())
} else {
plugin.EmitDeployEvent(deps, w, "image", "deployed")
}
}()
authConfig, err := buildRegistryAuth(deps, cfg.RegistryName)
if err != nil {
return fmt.Errorf("image source: %w", err)
@@ -486,37 +499,22 @@ type containerExtra struct {
ProxyRoutes map[string]string `json:"proxy_routes,omitempty"`
}
// Reconcile syncs the containers index for this workload with reality.
// MVP: just refreshes State from Docker. Future versions can re-deploy
// when the running container disagrees with the desired source config.
func (*source) Reconcile(ctx context.Context, deps plugin.Deps, w plugin.Workload) error {
rows, err := deps.Store.ListContainersByWorkload(w.ID)
if err != nil {
return fmt.Errorf("image source: list containers: %w", err)
}
for _, c := range rows {
if c.ContainerID == "" {
continue
}
running, err := deps.Docker.IsContainerRunning(ctx, c.ContainerID)
if err != nil {
// Most likely "no such container" — mark as missing so the UI
// surfaces it and the next deploy recreates.
if err := deps.Store.UpdateContainerState(c.ID, "missing"); err != nil {
slog.Warn("image source: mark missing", "id", c.ID, "error", err)
}
continue
}
desired := "running"
if !running {
desired = "stopped"
}
if c.State != desired {
if err := deps.Store.UpdateContainerState(c.ID, desired); err != nil {
slog.Warn("image source: state sync", "id", c.ID, "error", err)
}
}
}
// Reconcile is intentionally a no-op for the image source.
//
// State sync is fully handled by the generic reconciler pass that runs
// EARLIER in the same Reconciler.ReconcileOnce: its upsert loop writes each
// present container's State from the single `docker ps -a` snapshot
// (ListAllForReconciler), and its markMissing pass flips rows whose container
// ID is absent from that snapshot to 'missing'. Every image container carries
// the tinyforge.workload.id label (ContainerConfig.WorkloadID at create time),
// so the generic pass covers all of them.
//
// The previous implementation looped this workload's container rows and called
// Docker.IsContainerRunning per row — a redundant Docker inspect per container
// per tick that duplicated work already done from the snapshot and scaled as N
// Docker API calls/tick. Returning nil here drops that cost without changing
// observable state. The method stays because the source interface requires it.
func (*source) Reconcile(context.Context, plugin.Deps, plugin.Workload) error {
return nil
}
@@ -2,14 +2,12 @@ package static
import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"os"
"path/filepath"
"strconv"
"strings"
"time"
"github.com/moby/moby/api/types/mount"
@@ -543,11 +541,13 @@ func dispatchSiteNotification(deps plugin.Deps, w plugin.Workload, domain, statu
})
}
// publishEvent emits a static_site_status event on the bus AND
// persists an event_log row so the dashboard's audit trail picks it
// up. Message format ("Static site \"%s\": %s") is preserved verbatim
// from the legacy Manager.publishEvent so log scrapers and operator-
// configured event triggers keep matching.
// publishEvent emits a static_site_status event on the bus (drives the
// dashboard's per-site status pill) AND records a workload-scoped deploy
// event in the audit log. The audit InsertEvent + bus publish is
// centralised in plugin.EmitDeployEvent so the message/metadata shape and
// per-workload timeline are identical across all source kinds. This
// standardises the metadata key from the legacy "site_id" to "workload_id";
// no consumer reads the old key (verified repo-wide).
func publishEvent(deps plugin.Deps, w plugin.Workload, status string) {
deps.Events.Publish(events.Event{
Type: events.EventStaticSiteStatus,
@@ -558,47 +558,7 @@ func publishEvent(deps plugin.Deps, w plugin.Workload, status string) {
},
})
severity := "info"
if strings.HasPrefix(status, "failed") {
severity = "error"
}
message := fmt.Sprintf("Static site %q: %s", w.Name, status)
// Build metadata via json.Marshal so workload names containing
// quotes or backslashes don't produce invalid JSON for downstream
// log-scan consumers.
metaBytes, err := json.Marshal(map[string]string{
"site_id": w.ID,
"site_name": w.Name,
"status": status,
})
if err != nil {
slog.Error("static site: marshal event metadata", "error", err)
metaBytes = []byte("{}")
}
metadata := string(metaBytes)
evt, err := deps.Store.InsertEvent(store.EventLog{
Source: "static_site",
Severity: severity,
Message: message,
Metadata: metadata,
})
if err != nil {
slog.Error("static site: failed to persist event log", "error", err)
return
}
deps.Events.Publish(events.Event{
Type: events.EventLog,
Payload: events.EventLogPayload{
ID: evt.ID,
Source: "static_site",
Severity: severity,
Message: message,
Metadata: metadata,
CreatedAt: evt.CreatedAt,
},
})
plugin.EmitDeployEvent(deps, w, "static_site", status)
}
// removeContainerByName mirrors the legacy helper: enumerate Docker's