diff --git a/drivers/docker/stats.go b/drivers/docker/stats.go index 8ef12add7..a139e5998 100644 --- a/drivers/docker/stats.go +++ b/drivers/docker/stats.go @@ -93,34 +93,9 @@ func (h *taskHandle) Stats(ctx context.Context, interval time.Duration, compute func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, interval time.Duration, compute cpustats.Compute) { defer destCh.close() - timer, cancel := helper.NewSafeTimer(interval) + ticker, cancel := helper.NewSafeTicker(interval) defer cancel() - - // we need to use the streaming stats API here because our calculation for - // CPU usage depends on having the values from the previous read, which are - // not available in one-shot - statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, true) - if err != nil && err != io.EOF { - h.logger.Debug("error collecting stats from container", "error", err) - return - } - defer statsReader.Body.Close() - - collectOnce := func() { - defer timer.Reset(interval) - var stats *containerapi.Stats - err := json.NewDecoder(statsReader.Body).Decode(&stats) - if err != nil && err != io.EOF { - h.logger.Debug("error decoding stats data from container", "error", err) - return - } - if stats == nil { - h.logger.Debug("error decoding stats data: stats were nil") - return - } - resourceUsage := util.DockerStatsToTaskResourceUsage(stats, compute) - destCh.send(resourceUsage) - } + var stats *containerapi.Stats for { select { @@ -128,8 +103,31 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte return case <-h.doneCh: return - case <-timer.C: - collectOnce() + case <-ticker.C: + // we need to use the streaming stats API here because our calculation for + // CPU usage depends on having the values from the previous read, which are + // not available in one-shot. This streaming stats can be reused over time, + // but require synchronization, which restricts the interval for the metrics. + statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, true) + if err != nil && err != io.EOF { + h.logger.Debug("error collecting stats from container", "error", err) + return + } + + err = json.NewDecoder(statsReader.Body).Decode(&stats) + statsReader.Body.Close() + if err != nil && err != io.EOF { + h.logger.Error("error decoding stats data from container", "error", err) + return + } + + if stats == nil { + h.logger.Error("error decoding stats data: stats were nil") + return + } + + resourceUsage := util.DockerStatsToTaskResourceUsage(stats, compute) + destCh.send(resourceUsage) } } } diff --git a/helper/funcs.go b/helper/funcs.go index 536676dcf..e251328f6 100644 --- a/helper/funcs.go +++ b/helper/funcs.go @@ -387,6 +387,28 @@ func NewSafeTimer(duration time.Duration) (*time.Timer, StopFunc) { return t, cancel } +// NewSafeTicker creates a time.Ticker but does not panic if duration is <= 0. +// +// Returns the time.Ticker and also a StopFunc, forcing the caller to deal +// with stopping the time.Ticker to avoid leaking a goroutine. + +func NewSafeTicker(duration time.Duration) (*time.Ticker, StopFunc) { + if duration <= 0 { + // Avoid panic by using the smallest positive value. This is close enough + // to the behavior of time.After(0), which this helper is intended to + // replace. + // https://go.dev/play/p/EIkm9MsPbHY + duration = 1 + } + + t := time.NewTicker(duration) + cancel := func() { + t.Stop() + } + + return t, cancel +} + // NewStoppedTimer creates a time.Timer in a stopped state. This is useful when // the actual wait time will computed and set later via Reset. func NewStoppedTimer() (*time.Timer, StopFunc) {