// Package stats implements a background goroutine that periodically samples // Docker container and host-level resource usage and persists the samples // into SQLite. It reads its interval and retention from settings on every // tick so configuration changes take effect without a restart. package stats import ( "context" "log/slog" "sync" "time" "github.com/alexei/tinyforge/internal/docker" "github.com/alexei/tinyforge/internal/store" ) // Defaults applied when settings values are outside their valid range. const ( DefaultIntervalSeconds = 15 DefaultRetentionHours = 2 MinIntervalSeconds = 5 MaxIntervalSeconds = 300 // Hard cap on parallel container stat requests to avoid overwhelming // the Docker daemon when the user has many containers. maxParallelSamples = 8 ) // OwnerType values for ContainerStatsSample.OwnerType. const ( OwnerTypeInstance = "instance" OwnerTypeSite = "site" ) // Collector runs the background sampling loop. type Collector struct { store *store.Store docker *docker.Client startOnce sync.Once stopOnce sync.Once started bool stop chan struct{} done chan struct{} } // New creates a new stats collector. Call Start to begin sampling. func New(s *store.Store, d *docker.Client) *Collector { return &Collector{ store: s, docker: d, stop: make(chan struct{}), done: make(chan struct{}), } } // Start launches the background loop. Returns immediately. The loop exits // when Stop is called. Safe to call multiple times — only the first call has // an effect. func (c *Collector) Start() { c.startOnce.Do(func() { c.started = true go c.run() }) } // Stop signals the collector to exit and blocks until it has finished the // in-flight tick. If Start was never called, Stop returns immediately. func (c *Collector) Stop() { c.stopOnce.Do(func() { close(c.stop) if !c.started { close(c.done) } }) <-c.done } // run is the main loop. It reads the interval from settings on every tick, // which lets configuration changes propagate within one tick without a // dedicated reload mechanism. func (c *Collector) run() { defer close(c.done) // Derive a base context that's cancelled when Stop is called so in-flight // Docker requests abort instead of waiting out their timeout. baseCtx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { <-c.stop cancel() }() // Wait a few seconds before the first sample so the app has settled. select { case <-time.After(3 * time.Second): case <-c.stop: return } for { interval, retention := c.readConfig() if interval == 0 || retention == 0 { // Collection disabled. Poll settings every minute in case the // user re-enables it. select { case <-time.After(time.Minute): continue case <-c.stop: return } } c.tick(baseCtx, retention) select { case <-time.After(time.Duration(interval) * time.Second): case <-c.stop: return } } } // readConfig reads the current interval + retention from settings, applying // defaults and clamping to the valid range. func (c *Collector) readConfig() (intervalSeconds, retentionHours int) { settings, err := c.store.GetSettings() if err != nil { slog.Warn("stats collector: failed to read settings — using defaults", "error", err) return DefaultIntervalSeconds, DefaultRetentionHours } intervalSeconds = settings.StatsIntervalSeconds retentionHours = settings.StatsRetentionHours if intervalSeconds < 0 || retentionHours < 0 { return 0, 0 } if intervalSeconds > 0 && intervalSeconds < MinIntervalSeconds { intervalSeconds = MinIntervalSeconds } if intervalSeconds > MaxIntervalSeconds { intervalSeconds = MaxIntervalSeconds } return intervalSeconds, retentionHours } // tick samples all known containers, aggregates workload-level totals, // persists samples, and prunes rows beyond the retention window. When // the Docker daemon is unreachable the whole tick is skipped with a // single debug log instead of one warning per container. func (c *Collector) tick(parent context.Context, retentionHours int) { ctx, cancel := context.WithTimeout(parent, 30*time.Second) defer cancel() pingCtx, pingCancel := context.WithTimeout(ctx, 2*time.Second) defer pingCancel() if err := c.docker.Ping(pingCtx); err != nil { slog.Debug("stats collector: docker unreachable, skipping tick", "error", err) return } targets := c.buildTargets() if len(targets) == 0 { // No containers to sample, but still record a system sample so the // host history isn't empty. c.recordSystemSample(ctx, 0, 0, 0) c.prune(retentionHours) return } samples := c.sampleAll(ctx, targets) var ( totalCPU float64 totalMem int64 running int ) for _, s := range samples { if err := c.store.InsertContainerStatsSample(s); err != nil { slog.Warn("stats collector: insert container sample", "container", s.ContainerID, "error", err) continue } totalCPU += s.CPUPercent totalMem += s.MemoryUsage running++ } c.recordSystemSample(ctx, totalCPU, totalMem, running) c.prune(retentionHours) } // target describes a single container to sample. type target struct { ContainerID string OwnerType string OwnerID string } // buildTargets fetches container rows that have a docker container_id bound. // Project containers and stack containers are surfaced as OwnerTypeInstance // (the stats sample owner_type is kept for back-compat with the persisted // schema and the dashboard's group-by semantics). func (c *Collector) buildTargets() []target { var out []target containers, err := c.store.ListContainers(store.ContainerFilter{}) if err != nil { slog.Warn("stats collector: list containers", "error", err) } else { for _, row := range containers { if row.ContainerID == "" { continue } ownerType := OwnerTypeInstance if row.WorkloadKind == string(store.WorkloadKindSite) { ownerType = OwnerTypeSite } out = append(out, target{ ContainerID: row.ContainerID, OwnerType: ownerType, OwnerID: row.ID, }) } } return out } // sampleAll fetches Docker stats for every target in bounded parallelism. // Failed samples are logged and skipped — a missing container must not kill // the whole tick. func (c *Collector) sampleAll(ctx context.Context, targets []target) []store.ContainerStatsSample { sem := make(chan struct{}, maxParallelSamples) results := make([]store.ContainerStatsSample, len(targets)) found := make([]bool, len(targets)) var wg sync.WaitGroup loop: for i, t := range targets { // Acquire the semaphore in the parent loop so ctx cancellation // short-circuits the queue rather than spawning goroutines that // block on an unreachable slot. The labelled break exits the for // loop directly; a bare `break` inside `select` would only break // the select and let the loop continue. select { case sem <- struct{}{}: case <-ctx.Done(): break loop } wg.Add(1) go func(i int, t target) { defer wg.Done() defer func() { <-sem }() sampleCtx, cancel := context.WithTimeout(ctx, 10*time.Second) defer cancel() stats, err := c.docker.GetContainerStats(sampleCtx, t.ContainerID) if err != nil { slog.Debug("stats collector: get container stats", "container", t.ContainerID, "owner_type", t.OwnerType, "error", err) return } ts := stats.Timestamp.Unix() if ts <= 0 { ts = time.Now().UTC().Unix() } results[i] = store.ContainerStatsSample{ ContainerID: t.ContainerID, OwnerType: t.OwnerType, OwnerID: t.OwnerID, TS: ts, CPUPercent: stats.CPUPercent, MemoryUsage: stats.MemoryUsage, MemoryLimit: stats.MemoryLimit, NetworkRxBytes: stats.NetworkRxBytes, NetworkTxBytes: stats.NetworkTxBytes, BlockReadBytes: stats.BlockReadBytes, BlockWriteBytes: stats.BlockWriteBytes, } found[i] = true }(i, t) } wg.Wait() out := results[:0] for i := range results { if found[i] { out = append(out, results[i]) } } return out } // recordSystemSample fetches host info + disk usage and persists a combined // system-level sample. Failures are warned but do not propagate. func (c *Collector) recordSystemSample(ctx context.Context, workloadCPU float64, workloadMem int64, running int) { sysStats, err := c.docker.GetSystemStats(ctx) if err != nil { slog.Warn("stats collector: get system stats", "error", err) return } ts := sysStats.Timestamp.Unix() if ts <= 0 { ts = time.Now().UTC().Unix() } sample := store.SystemStatsSample{ TS: ts, NCPU: sysStats.NCPU, MemoryTotal: sysStats.MemoryTotal, WorkloadCPUPercent: workloadCPU, WorkloadMemUsage: workloadMem, ContainersRunning: running, DiskTotalBytes: sysStats.DiskTotalBytes, } // Prefer the Docker-reported running count when we have no running samples // (e.g., very first tick may race with container readiness). if running == 0 && sysStats.Running > 0 { sample.ContainersRunning = sysStats.Running } if err := c.store.InsertSystemStatsSample(sample); err != nil { slog.Warn("stats collector: insert system sample", "error", err) } } // prune drops rows older than the retention window. func (c *Collector) prune(retentionHours int) { if retentionHours <= 0 { return } cutoff := time.Now().UTC().Add(-time.Duration(retentionHours) * time.Hour).Unix() if _, err := c.store.PruneStatsSamplesBefore(cutoff); err != nil { slog.Warn("stats collector: prune", "error", err) } }