Files
tiny-forge/internal/stats/collector.go
T
alexei.dolgolyov a4362b842d
Build / build (push) Successful in 11m42s
fix: harden security, fix concurrency bugs, and address review findings
Security:
- rate limit /api/webhook routes per-IP and cap concurrent site syncs
- global SSE connection cap (256) with new sse_gate
- validate ?tail= and cap JSON log responses at 4 MiB
- strip ANSI/CSI/OSC and control bytes from streamed log lines
- redact webhook secret from request log middleware
- scrub host details from /api/health for non-admin viewers
- drop container_id from /api/system/stats/top for non-admins
- generate webhook secrets via crypto/rand; require >=32 chars on insert
- verify iid path consistency in streamContainerLogs
- LimitReader on site webhook body; reject malformed non-empty bodies

Concurrency / correctness:
- stats collector: Stop() no longer hangs without Start(), semaphore
  acquired in parent loop so ctx cancellation short-circuits the queue,
  in-flight tick cancellable via shared base context, zero-ts guard
- webhook handler: replace fire-and-forget goroutine with WaitGroup-tracked
  workers + Drain() wired into graceful shutdown
- $derived(() => ...) mis-idiom fixed in ContainerStats / InstanceCard /
  ProjectCard (returned function instead of value)
- SystemResourcesCard: rename `window` and `t` locals to avoid shadowing
  globalThis.window and the i18n `t` import

Quality / performance:
- replace O(n^2) insertion sort with sort.Slice in stats top
- runMigrations only swallows duplicate-column / already-exists errors
- PruneStatsSamplesBefore wrapped in a transaction
- collapse N+1 in unusedImageStats / pruneImages to one ListAllInstances
  pass; surface DB errors instead of silently treating them as inactive
- run Docker Info + DiskUsage in parallel via errgroup
- container log SSE emits `: ping` heartbeat every 20 s
- imageMatches case-insensitive on registry host (RFC behaviour)
- log warning on invalid stage tag pattern instead of silent skip
- reject malformed non-empty site webhook payloads

Frontend / i18n:
- shared formatBytes utility replaces three local copies
- statsInterval store drives dynamic "no samples / collection disabled"
  copy across ContainerStats and SystemResourcesCard
- top consumers row now shows owner_name (project/stage or site name)
- drop seven `as any` casts on the Settings type; add cloudflare_api_token
  write-only field
- move "Service status", "Docker daemon", "Docker unreachable",
  "Proxy unreachable", "reachable", and "Docker daemon is not reachable."
  strings into en/ru i18n bundles
2026-05-07 00:56:14 +03:00

344 lines
9.2 KiB
Go

// Package stats implements a background goroutine that periodically samples
// Docker container and host-level resource usage and persists the samples
// into SQLite. It reads its interval and retention from settings on every
// tick so configuration changes take effect without a restart.
package stats
import (
"context"
"log/slog"
"sync"
"time"
"github.com/alexei/tinyforge/internal/docker"
"github.com/alexei/tinyforge/internal/store"
)
// Defaults applied when settings values are outside their valid range.
const (
DefaultIntervalSeconds = 15
DefaultRetentionHours = 2
MinIntervalSeconds = 5
MaxIntervalSeconds = 300
// Hard cap on parallel container stat requests to avoid overwhelming
// the Docker daemon when the user has many containers.
maxParallelSamples = 8
)
// OwnerType values for ContainerStatsSample.OwnerType.
const (
OwnerTypeInstance = "instance"
OwnerTypeSite = "site"
)
// Collector runs the background sampling loop.
type Collector struct {
store *store.Store
docker *docker.Client
startOnce sync.Once
stopOnce sync.Once
started bool
stop chan struct{}
done chan struct{}
}
// New creates a new stats collector. Call Start to begin sampling.
func New(s *store.Store, d *docker.Client) *Collector {
return &Collector{
store: s,
docker: d,
stop: make(chan struct{}),
done: make(chan struct{}),
}
}
// Start launches the background loop. Returns immediately. The loop exits
// when Stop is called. Safe to call multiple times — only the first call has
// an effect.
func (c *Collector) Start() {
c.startOnce.Do(func() {
c.started = true
go c.run()
})
}
// Stop signals the collector to exit and blocks until it has finished the
// in-flight tick. If Start was never called, Stop returns immediately.
func (c *Collector) Stop() {
c.stopOnce.Do(func() {
close(c.stop)
if !c.started {
close(c.done)
}
})
<-c.done
}
// run is the main loop. It reads the interval from settings on every tick,
// which lets configuration changes propagate within one tick without a
// dedicated reload mechanism.
func (c *Collector) run() {
defer close(c.done)
// Derive a base context that's cancelled when Stop is called so in-flight
// Docker requests abort instead of waiting out their timeout.
baseCtx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-c.stop
cancel()
}()
// Wait a few seconds before the first sample so the app has settled.
select {
case <-time.After(3 * time.Second):
case <-c.stop:
return
}
for {
interval, retention := c.readConfig()
if interval == 0 || retention == 0 {
// Collection disabled. Poll settings every minute in case the
// user re-enables it.
select {
case <-time.After(time.Minute):
continue
case <-c.stop:
return
}
}
c.tick(baseCtx, retention)
select {
case <-time.After(time.Duration(interval) * time.Second):
case <-c.stop:
return
}
}
}
// readConfig reads the current interval + retention from settings, applying
// defaults and clamping to the valid range.
func (c *Collector) readConfig() (intervalSeconds, retentionHours int) {
settings, err := c.store.GetSettings()
if err != nil {
slog.Warn("stats collector: failed to read settings — using defaults", "error", err)
return DefaultIntervalSeconds, DefaultRetentionHours
}
intervalSeconds = settings.StatsIntervalSeconds
retentionHours = settings.StatsRetentionHours
if intervalSeconds < 0 || retentionHours < 0 {
return 0, 0
}
if intervalSeconds > 0 && intervalSeconds < MinIntervalSeconds {
intervalSeconds = MinIntervalSeconds
}
if intervalSeconds > MaxIntervalSeconds {
intervalSeconds = MaxIntervalSeconds
}
return intervalSeconds, retentionHours
}
// tick samples all known containers, aggregates workload-level totals,
// persists samples, and prunes rows beyond the retention window. When
// the Docker daemon is unreachable the whole tick is skipped with a
// single debug log instead of one warning per container.
func (c *Collector) tick(parent context.Context, retentionHours int) {
ctx, cancel := context.WithTimeout(parent, 30*time.Second)
defer cancel()
pingCtx, pingCancel := context.WithTimeout(ctx, 2*time.Second)
defer pingCancel()
if err := c.docker.Ping(pingCtx); err != nil {
slog.Debug("stats collector: docker unreachable, skipping tick", "error", err)
return
}
targets := c.buildTargets()
if len(targets) == 0 {
// No containers to sample, but still record a system sample so the
// host history isn't empty.
c.recordSystemSample(ctx, 0, 0, 0)
c.prune(retentionHours)
return
}
samples := c.sampleAll(ctx, targets)
var (
totalCPU float64
totalMem int64
running int
)
for _, s := range samples {
if err := c.store.InsertContainerStatsSample(s); err != nil {
slog.Warn("stats collector: insert container sample",
"container", s.ContainerID, "error", err)
continue
}
totalCPU += s.CPUPercent
totalMem += s.MemoryUsage
running++
}
c.recordSystemSample(ctx, totalCPU, totalMem, running)
c.prune(retentionHours)
}
// target describes a single container to sample.
type target struct {
ContainerID string
OwnerType string
OwnerID string
}
// buildTargets fetches running instances and sites that have a container ID.
func (c *Collector) buildTargets() []target {
var out []target
instances, err := c.store.ListAllInstances()
if err != nil {
slog.Warn("stats collector: list instances", "error", err)
} else {
for _, inst := range instances {
if inst.ContainerID == "" {
continue
}
out = append(out, target{
ContainerID: inst.ContainerID,
OwnerType: OwnerTypeInstance,
OwnerID: inst.ID,
})
}
}
sites, err := c.store.GetAllStaticSites()
if err != nil {
slog.Warn("stats collector: list sites", "error", err)
} else {
for _, site := range sites {
if site.ContainerID == "" {
continue
}
out = append(out, target{
ContainerID: site.ContainerID,
OwnerType: OwnerTypeSite,
OwnerID: site.ID,
})
}
}
return out
}
// sampleAll fetches Docker stats for every target in bounded parallelism.
// Failed samples are logged and skipped — a missing container must not kill
// the whole tick.
func (c *Collector) sampleAll(ctx context.Context, targets []target) []store.ContainerStatsSample {
sem := make(chan struct{}, maxParallelSamples)
results := make([]store.ContainerStatsSample, len(targets))
found := make([]bool, len(targets))
var wg sync.WaitGroup
for i, t := range targets {
// Acquire the semaphore in the parent loop so ctx cancellation
// short-circuits the queue rather than spawning goroutines that
// block on an unreachable slot.
select {
case sem <- struct{}{}:
case <-ctx.Done():
break
}
if ctx.Err() != nil {
break
}
wg.Add(1)
go func(i int, t target) {
defer wg.Done()
defer func() { <-sem }()
sampleCtx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
stats, err := c.docker.GetContainerStats(sampleCtx, t.ContainerID)
if err != nil {
slog.Debug("stats collector: get container stats",
"container", t.ContainerID, "owner_type", t.OwnerType, "error", err)
return
}
ts := stats.Timestamp.Unix()
if ts <= 0 {
ts = time.Now().UTC().Unix()
}
results[i] = store.ContainerStatsSample{
ContainerID: t.ContainerID,
OwnerType: t.OwnerType,
OwnerID: t.OwnerID,
TS: ts,
CPUPercent: stats.CPUPercent,
MemoryUsage: stats.MemoryUsage,
MemoryLimit: stats.MemoryLimit,
NetworkRxBytes: stats.NetworkRxBytes,
NetworkTxBytes: stats.NetworkTxBytes,
BlockReadBytes: stats.BlockReadBytes,
BlockWriteBytes: stats.BlockWriteBytes,
}
found[i] = true
}(i, t)
}
wg.Wait()
out := results[:0]
for i := range results {
if found[i] {
out = append(out, results[i])
}
}
return out
}
// recordSystemSample fetches host info + disk usage and persists a combined
// system-level sample. Failures are warned but do not propagate.
func (c *Collector) recordSystemSample(ctx context.Context, workloadCPU float64, workloadMem int64, running int) {
sysStats, err := c.docker.GetSystemStats(ctx)
if err != nil {
slog.Warn("stats collector: get system stats", "error", err)
return
}
ts := sysStats.Timestamp.Unix()
if ts <= 0 {
ts = time.Now().UTC().Unix()
}
sample := store.SystemStatsSample{
TS: ts,
NCPU: sysStats.NCPU,
MemoryTotal: sysStats.MemoryTotal,
WorkloadCPUPercent: workloadCPU,
WorkloadMemUsage: workloadMem,
ContainersRunning: running,
DiskTotalBytes: sysStats.DiskTotalBytes,
}
// Prefer the Docker-reported running count when we have no running samples
// (e.g., very first tick may race with container readiness).
if running == 0 && sysStats.Running > 0 {
sample.ContainersRunning = sysStats.Running
}
if err := c.store.InsertSystemStatsSample(sample); err != nil {
slog.Warn("stats collector: insert system sample", "error", err)
}
}
// prune drops rows older than the retention window.
func (c *Collector) prune(retentionHours int) {
if retentionHours <= 0 {
return
}
cutoff := time.Now().UTC().Add(-time.Duration(retentionHours) * time.Hour).Unix()
if _, err := c.store.PruneStatsSamplesBefore(cutoff); err != nil {
slog.Warn("stats collector: prune", "error", err)
}
}