[gh-24339] Move from streaming stats to polling for docker (#24525)

* fix: dont stream the docker stats, read them one by one

* func: add a NewSafeTicker to the herlper functions

* style: remove commented code
This commit is contained in:
Juana De La Cuesta
2024-11-21 17:36:53 +01:00
committed by GitHub
parent bfb714144e
commit a9e7166b6b
2 changed files with 49 additions and 29 deletions

View File

@@ -93,34 +93,9 @@ func (h *taskHandle) Stats(ctx context.Context, interval time.Duration, compute
func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, interval time.Duration, compute cpustats.Compute) {
defer destCh.close()
timer, cancel := helper.NewSafeTimer(interval)
ticker, cancel := helper.NewSafeTicker(interval)
defer cancel()
// we need to use the streaming stats API here because our calculation for
// CPU usage depends on having the values from the previous read, which are
// not available in one-shot
statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, true)
if err != nil && err != io.EOF {
h.logger.Debug("error collecting stats from container", "error", err)
return
}
defer statsReader.Body.Close()
collectOnce := func() {
defer timer.Reset(interval)
var stats *containerapi.Stats
err := json.NewDecoder(statsReader.Body).Decode(&stats)
if err != nil && err != io.EOF {
h.logger.Debug("error decoding stats data from container", "error", err)
return
}
if stats == nil {
h.logger.Debug("error decoding stats data: stats were nil")
return
}
resourceUsage := util.DockerStatsToTaskResourceUsage(stats, compute)
destCh.send(resourceUsage)
}
var stats *containerapi.Stats
for {
select {
@@ -128,8 +103,31 @@ func (h *taskHandle) collectStats(ctx context.Context, destCh *usageSender, inte
return
case <-h.doneCh:
return
case <-timer.C:
collectOnce()
case <-ticker.C:
// we need to use the streaming stats API here because our calculation for
// CPU usage depends on having the values from the previous read, which are
// not available in one-shot. This streaming stats can be reused over time,
// but require synchronization, which restricts the interval for the metrics.
statsReader, err := h.dockerClient.ContainerStats(ctx, h.containerID, true)
if err != nil && err != io.EOF {
h.logger.Debug("error collecting stats from container", "error", err)
return
}
err = json.NewDecoder(statsReader.Body).Decode(&stats)
statsReader.Body.Close()
if err != nil && err != io.EOF {
h.logger.Error("error decoding stats data from container", "error", err)
return
}
if stats == nil {
h.logger.Error("error decoding stats data: stats were nil")
return
}
resourceUsage := util.DockerStatsToTaskResourceUsage(stats, compute)
destCh.send(resourceUsage)
}
}
}

View File

@@ -387,6 +387,28 @@ func NewSafeTimer(duration time.Duration) (*time.Timer, StopFunc) {
return t, cancel
}
// NewSafeTicker creates a time.Ticker but does not panic if duration is <= 0.
//
// Returns the time.Ticker and also a StopFunc, forcing the caller to deal
// with stopping the time.Ticker to avoid leaking a goroutine.
func NewSafeTicker(duration time.Duration) (*time.Ticker, StopFunc) {
if duration <= 0 {
// Avoid panic by using the smallest positive value. This is close enough
// to the behavior of time.After(0), which this helper is intended to
// replace.
// https://go.dev/play/p/EIkm9MsPbHY
duration = 1
}
t := time.NewTicker(duration)
cancel := func() {
t.Stop()
}
return t, cancel
}
// NewStoppedTimer creates a time.Timer in a stopped state. This is useful when
// the actual wait time will computed and set later via Reset.
func NewStoppedTimer() (*time.Timer, StopFunc) {