Files
alexei.dolgolyov 7a9ff7ad54 feat(observability): event triggers + log scanner backend
Two paired backends sharing the events.Bus seam:

Event triggers (consumer-side):
- internal/store/event_triggers.go — CRUD with action_secret
  redaction on read (placeholder echo treated as "no change" on
  PATCH so secrets aren't accidentally wiped).
- internal/events/dispatcher.go — bus subscriber, AND-composed
  filters (severity CSV, source CSV, message regex with memoized
  compile cache). Structural loop-prevention: never writes to
  event_log. Sends via notifier.SendPayload.
- internal/notify: SendPayload + SendSyncForTestPayload methods,
  TierEventTrigger constant, doSendRaw shared with the legacy
  Event-shaped path.
- internal/api/event_triggers.go — admin-gated CRUD + /test
  sending the real TriggerWebhookPayload shape. SSRF guard
  rejects loopback / link-local / unspecified targets. PATCH
  uses pointer-typed DTO for partial updates.

Log scanner (producer-side):
- internal/logscanner/ — engine (per-rule cooldown +
  per-container token bucket, atomic drop counters), tail
  (multiplexed docker frame demuxer with TTY fallback + 16 MiB
  payload cap + 1 MiB reassembly cap + RFC3339Nano-validated
  timestamp strip + UTF-8-safe message truncation), manager
  (5s container polling, atomic.Pointer[Snapshot] hot-reload,
  HitEmitter writes event_log + publishes EventLog so the
  trigger dispatcher picks them up immediately).
- internal/docker/container.go — ContainerLogsOpts exposes
  stream selection for stderr-only / stdout-only rules.
- internal/store: log_scan_rules table + CRUD with
  EffectiveLogScanRules resolver (globals minus per-workload
  overrides plus workload-only additions). Transactional
  cascade-delete of overrides when a global rule is removed.
- internal/api/log_scan_rules.go — admin-gated CRUD + /test
  (sample_line → matched/captures) + /stats (drop counters +
  active tail count + last-snapshot compile errors) +
  GET /api/workloads/{id}/effective-rules.

cmd/server/main.go wires both subsystems next to the existing
RegisterPersistentLogger. Coverage spans engine cooldown / bucket
counter tests, snapshot effective-set semantics, manager compile-
error capture, dispatcher matching, store validation +
cascade-delete, API URL validator + secret redaction.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-11 22:18:11 +03:00

459 lines
14 KiB
Go

package docker
import (
"context"
"fmt"
"io"
"regexp"
"strconv"
"strings"
"github.com/moby/moby/api/types/container"
"github.com/moby/moby/api/types/mount"
"github.com/moby/moby/api/types/network"
"github.com/moby/moby/client"
)
// ContainerConfig holds all parameters needed to create a managed container.
type ContainerConfig struct {
// Name is the container name (deterministic: dw-{project}-{stage}-{tag}).
Name string
// Image is the full image reference including tag (e.g. "myapp:v1.2.3").
Image string
// Env is a list of environment variables in "KEY=VALUE" format.
Env []string
// ExposedPorts lists the container ports to publish (e.g. ["8080/tcp"]).
// Each port is mapped to a random host port via Docker auto-assignment.
ExposedPorts []string
// NetworkName is the Docker network to attach the container to at creation.
NetworkName string
// NetworkID is the ID of the Docker network (used for endpoint config).
NetworkID string
// Labels are additional labels to apply to the container.
// Tinyforge management labels are added automatically via Project, Stage, and InstanceID.
Labels map[string]string
// WorkloadID is the unifying primitive's row ID (Workload.ID). Future
// reconciler / global views key off this label, so it must be set on
// every Tinyforge-managed container (project, stack, site).
WorkloadID string
// WorkloadKind is 'project' | 'stack' | 'site'. Denormalized here so
// label-selector queries don't need to join through workloads.
WorkloadKind string
// Role is the per-kind sub-identifier: stage name for projects, service
// name for stacks, empty for sites. Used by the reconciler to upsert.
Role string
// Mounts is a list of bind mounts to attach to the container.
Mounts []mount.Mount
// CpuLimit is the CPU limit in cores (e.g., 0.5, 1, 2). 0 = unlimited.
CpuLimit float64
// MemoryLimit is the memory limit in megabytes. 0 = unlimited.
MemoryLimit int
}
// sanitizeTag replaces characters that are invalid in Docker container names
// with hyphens and lowercases the result.
var invalidNameChars = regexp.MustCompile(`[^a-zA-Z0-9_.-]`)
// ContainerName builds a deterministic container name from project, stage, and tag.
func ContainerName(project, stage, tag string) string {
sanitizeComponent := func(s string) string {
s = invalidNameChars.ReplaceAllString(s, "-")
return strings.Trim(s, "-")
}
return fmt.Sprintf("dw-%s-%s-%s", sanitizeComponent(project), sanitizeComponent(stage), sanitizeComponent(tag))
}
// CreateContainer creates a new container with the given configuration.
// It returns the container ID on success.
func (c *Client) CreateContainer(ctx context.Context, cfg ContainerConfig) (string, error) {
// Build port bindings: each exposed port maps to a random host port.
exposedPorts := network.PortSet{}
portBindings := network.PortMap{}
for _, p := range cfg.ExposedPorts {
port, err := network.ParsePort(p)
if err != nil {
return "", fmt.Errorf("parse port %s: %w", p, err)
}
exposedPorts[port] = struct{}{}
portBindings[port] = []network.PortBinding{
{HostPort: ""}, // empty HostPort = auto-assign
}
}
// Merge Tinyforge labels with any additional labels.
labels := make(map[string]string)
for k, v := range cfg.Labels {
labels[k] = v
}
// Workload-shaped labels — the canonical Tinyforge label set.
labels[LabelManaged] = "true"
if cfg.WorkloadID != "" {
labels[LabelWorkloadID] = cfg.WorkloadID
}
if cfg.WorkloadKind != "" {
labels[LabelWorkloadKind] = cfg.WorkloadKind
}
if cfg.Role != "" {
labels[LabelRole] = cfg.Role
}
containerCfg := &container.Config{
Image: cfg.Image,
Env: cfg.Env,
ExposedPorts: exposedPorts,
Labels: labels,
}
hostCfg := &container.HostConfig{
PortBindings: portBindings,
RestartPolicy: container.RestartPolicy{Name: container.RestartPolicyDisabled},
Mounts: cfg.Mounts,
Resources: containerResources(cfg.CpuLimit, cfg.MemoryLimit),
}
// Attach to network at creation time if specified.
var networkCfg *network.NetworkingConfig
if cfg.NetworkName != "" {
networkCfg = &network.NetworkingConfig{
EndpointsConfig: map[string]*network.EndpointSettings{
cfg.NetworkName: {
NetworkID: cfg.NetworkID,
},
},
}
}
resp, err := c.api.ContainerCreate(ctx, client.ContainerCreateOptions{
Config: containerCfg,
HostConfig: hostCfg,
NetworkingConfig: networkCfg,
Name: cfg.Name,
})
if err != nil {
return "", fmt.Errorf("create container %s: %w", cfg.Name, err)
}
return resp.ID, nil
}
// containerResources builds Docker resource constraints from CPU cores and memory MB.
func containerResources(cpuLimit float64, memoryLimitMB int) container.Resources {
r := container.Resources{}
if cpuLimit > 0 {
// NanoCPUs is in units of 1e-9 CPUs. 1 core = 1e9 nanoCPUs.
r.NanoCPUs = int64(cpuLimit * 1e9)
}
if memoryLimitMB > 0 {
r.Memory = int64(memoryLimitMB) * 1024 * 1024
}
return r
}
// StartContainer starts a stopped container.
func (c *Client) StartContainer(ctx context.Context, containerID string) error {
if _, err := c.api.ContainerStart(ctx, containerID, client.ContainerStartOptions{}); err != nil {
return fmt.Errorf("start container %s: %w", containerID, err)
}
return nil
}
// StopContainer gracefully stops a running container with the given timeout in seconds.
// A nil timeout uses the Docker default (10 seconds).
func (c *Client) StopContainer(ctx context.Context, containerID string, timeoutSeconds int) error {
opts := client.ContainerStopOptions{}
if timeoutSeconds > 0 {
opts.Timeout = &timeoutSeconds
}
if _, err := c.api.ContainerStop(ctx, containerID, opts); err != nil {
return fmt.Errorf("stop container %s: %w", containerID, err)
}
return nil
}
// RemoveContainer removes a container. If force is true, a running container
// will be killed before removal.
func (c *Client) RemoveContainer(ctx context.Context, containerID string, force bool) error {
opts := client.ContainerRemoveOptions{
Force: force,
RemoveVolumes: true,
}
if _, err := c.api.ContainerRemove(ctx, containerID, opts); err != nil {
return fmt.Errorf("remove container %s: %w", containerID, err)
}
return nil
}
// RestartContainer restarts a container with the given timeout in seconds.
func (c *Client) RestartContainer(ctx context.Context, containerID string, timeoutSeconds int) error {
opts := client.ContainerRestartOptions{}
if timeoutSeconds > 0 {
opts.Timeout = &timeoutSeconds
}
if _, err := c.api.ContainerRestart(ctx, containerID, opts); err != nil {
return fmt.Errorf("restart container %s: %w", containerID, err)
}
return nil
}
// ManagedContainer holds summary information about a container managed by Tinyforge.
// WorkloadID/Kind/Role are pulled from the canonical Tinyforge labels.
type ManagedContainer struct {
ID string
Name string
Image string
Status string
State string
WorkloadID string
WorkloadKind string
Role string
Ports []uint16
}
// ListContainers returns all Tinyforge-managed containers (label
// tinyforge.managed=true), optionally narrowed by additional label filters.
// Returns the workload labels so callers can dispatch / display without an
// extra inspect call.
func (c *Client) ListContainers(ctx context.Context, labelFilters map[string]string) ([]ManagedContainer, error) {
filterArgs := make(client.Filters)
filterArgs.Add("label", LabelManaged+"=true")
for k, v := range labelFilters {
if v != "" {
filterArgs.Add("label", k+"="+v)
} else {
filterArgs.Add("label", k)
}
}
listResult, err := c.api.ContainerList(ctx, client.ContainerListOptions{
All: true,
Filters: filterArgs,
})
if err != nil {
return nil, fmt.Errorf("list containers: %w", err)
}
result := make([]ManagedContainer, 0, len(listResult.Items))
for _, ctr := range listResult.Items {
name := ""
if len(ctr.Names) > 0 {
// Docker prefixes names with "/".
name = strings.TrimPrefix(ctr.Names[0], "/")
}
var ports []uint16
for _, p := range ctr.Ports {
if p.PublicPort > 0 {
ports = append(ports, p.PublicPort)
}
}
result = append(result, ManagedContainer{
ID: ctr.ID,
Name: name,
Image: ctr.Image,
Status: ctr.Status,
State: string(ctr.State),
WorkloadID: ctr.Labels[LabelWorkloadID],
WorkloadKind: ctr.Labels[LabelWorkloadKind],
Role: ctr.Labels[LabelRole],
Ports: ports,
})
}
return result, nil
}
// ReconcileItem is a fat container summary aimed at the reconciler — it
// exposes the full label map so the caller can dispatch by workload labels,
// legacy labels, or compose labels without re-inspecting.
type ReconcileItem struct {
ID string
Name string
Image string
State string
Status string
Labels map[string]string
Ports []uint16
}
// ListAllForReconciler returns every container the daemon knows about whose
// labels mark it as Tinyforge-managed by ANY of the supported schemes:
// - tinyforge.managed (canonical — every project, stack, site we own)
// - tinyforge.static-site (sites that predate the workload labels)
// - com.docker.compose.project starting with "tinyforge-" (stacks)
//
// The Docker API does not support OR'd label filters, so we list everything
// and filter in-process. On a small/medium daemon this is cheap; the
// reconciler runs on a 30s tick.
func (c *Client) ListAllForReconciler(ctx context.Context) ([]ReconcileItem, error) {
listResult, err := c.api.ContainerList(ctx, client.ContainerListOptions{All: true})
if err != nil {
return nil, fmt.Errorf("list containers: %w", err)
}
out := make([]ReconcileItem, 0, len(listResult.Items))
for _, ctr := range listResult.Items {
labels := ctr.Labels
if !isTinyforgeManaged(labels) {
continue
}
name := ""
if len(ctr.Names) > 0 {
name = strings.TrimPrefix(ctr.Names[0], "/")
}
var ports []uint16
for _, p := range ctr.Ports {
if p.PublicPort > 0 {
ports = append(ports, p.PublicPort)
}
}
out = append(out, ReconcileItem{
ID: ctr.ID,
Name: name,
Image: ctr.Image,
State: string(ctr.State),
Status: ctr.Status,
Labels: labels,
Ports: ports,
})
}
return out, nil
}
// isTinyforgeManaged returns true when a container's labels mark it as
// belonging to Tinyforge under any of the supported labelling schemes.
func isTinyforgeManaged(labels map[string]string) bool {
if labels == nil {
return false
}
if labels[LabelManaged] == "true" {
return true
}
if _, ok := labels["tinyforge.static-site"]; ok {
return true
}
if cp, ok := labels["com.docker.compose.project"]; ok && strings.HasPrefix(cp, "tinyforge-") {
return true
}
return false
}
// ContainerLogs returns a log stream for a container.
// If follow is true, the stream stays open for new log lines.
// tail specifies the number of lines from the end to return (e.g., "200").
// Both stdout and stderr are streamed. For stream-selective reads
// (e.g. the log scanner narrowing to stderr-only), use ContainerLogsOpts.
func (c *Client) ContainerLogs(ctx context.Context, containerID string, follow bool, tail string) (io.ReadCloser, error) {
return c.ContainerLogsOpts(ctx, containerID, ContainerLogOptions{
Follow: follow,
Tail: tail,
ShowStdout: true,
ShowStderr: true,
})
}
// ContainerLogOptions controls which streams + framing are pulled
// from a container. Currently expanded over the legacy ContainerLogs
// shape so the log-scanner can read stderr-only rules without
// post-filtering every line.
type ContainerLogOptions struct {
Follow bool
Tail string
ShowStdout bool
ShowStderr bool
}
// ContainerLogsOpts is the stream-selectable counterpart to
// ContainerLogs. When both ShowStdout and ShowStderr are false the
// upstream client returns an empty stream — we treat that as caller
// error and return an explicit message rather than a silent no-op.
func (c *Client) ContainerLogsOpts(ctx context.Context, containerID string, opts ContainerLogOptions) (io.ReadCloser, error) {
if !opts.ShowStdout && !opts.ShowStderr {
return nil, fmt.Errorf("container logs %s: at least one of ShowStdout/ShowStderr must be true", containerID)
}
result, err := c.api.ContainerLogs(ctx, containerID, client.ContainerLogsOptions{
ShowStdout: opts.ShowStdout,
ShowStderr: opts.ShowStderr,
Follow: opts.Follow,
Tail: opts.Tail,
Timestamps: true,
})
if err != nil {
return nil, fmt.Errorf("container logs %s: %w", containerID, err)
}
return result, nil
}
// IsContainerRunning checks if a container is in the "running" state.
func (c *Client) IsContainerRunning(ctx context.Context, containerID string) (bool, error) {
inspectResult, err := c.api.ContainerInspect(ctx, containerID, client.ContainerInspectOptions{})
if err != nil {
return false, err
}
return inspectResult.Container.State != nil && inspectResult.Container.State.Running, nil
}
// InspectContainerPort returns the host port mapped to a given container port.
// This is useful after starting a container with auto-assigned ports.
func (c *Client) InspectContainerPort(ctx context.Context, containerID string, containerPort string) (uint16, error) {
inspectResult, err := c.api.ContainerInspect(ctx, containerID, client.ContainerInspectOptions{})
if err != nil {
return 0, fmt.Errorf("inspect container %s: %w", containerID, err)
}
inspect := inspectResult.Container
port, err := network.ParsePort(containerPort)
if err != nil {
return 0, fmt.Errorf("parse container port %s: %w", containerPort, err)
}
bindings, ok := inspect.NetworkSettings.Ports[port]
if !ok || len(bindings) == 0 {
return 0, fmt.Errorf("container %s: no binding for port %s", containerID, containerPort)
}
var hostPort uint16
for _, b := range bindings {
if b.HostPort != "" {
parsed := parsePort(b.HostPort)
if parsed > 0 {
hostPort = parsed
break
}
}
}
if hostPort == 0 {
return 0, fmt.Errorf("container %s: no host port for %s", containerID, containerPort)
}
return hostPort, nil
}
// parsePort converts a port string to uint16. Returns 0 on failure.
func parsePort(s string) uint16 {
n, err := strconv.ParseUint(s, 10, 16)
if err != nil {
return 0
}
return uint16(n)
}