diff --git a/client/allocrunner/taskrunner/stats_hook.go b/client/allocrunner/taskrunner/stats_hook.go index e227f9524..24dee30e5 100644 --- a/client/allocrunner/taskrunner/stats_hook.go +++ b/client/allocrunner/taskrunner/stats_hook.go @@ -88,56 +88,18 @@ func (h *statsHook) Exited(context.Context, *interfaces.TaskExitedRequest, *inte // Collection ends when the passed channel is closed func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interfaces.DriverStats) { - ch, err := handle.Stats(ctx, h.interval) +MAIN: + ch, err := h.callStatsWithRetry(ctx, handle) if err != nil { - // Check if the driver doesn't implement stats - if err.Error() == cstructs.DriverStatsNotImplemented.Error() { - h.logger.Debug("driver does not support stats") - return - } - h.logger.Error("failed to start stats collection for task", "error", err) + return } - var backoff time.Duration - var retry int - limit := time.Second * 5 for { - time.Sleep(backoff) select { case ru, ok := <-ch: - // Channel is closed + // if channel closes, re-establish a new one if !ok { - var re *structs.RecoverableError - ch, err = handle.Stats(ctx, h.interval) - if err == nil { - goto RETRY - } - - // We do not log when the plugin is shutdown since this is - // likely because the driver plugin has unexpectedly exited, - // in which case sleeping and trying again or returning based - // on the stop channel is the correct behavior - if err != bstructs.ErrPluginShutdown { - h.logger.Debug("error fetching stats of task", "error", err) - goto RETRY - } - // check if the error is terminal otherwise it's likely a - // transport error and we should retry - re, ok = err.(*structs.RecoverableError) - if ok && re.IsUnrecoverable() { - return - } - h.logger.Warn("stats collection for task failed", "error", err) - RETRY: - // Calculate the new backoff - backoff = (1 << (2 * uint64(retry))) * time.Second - if backoff > limit { - backoff = limit - } - // Increment retry counter - retry++ - - continue + goto MAIN } // Update stats on TaskRunner and emit them @@ -149,6 +111,58 @@ func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interf } } +// callStatsWithRetry invokes handle driver Stats() functions and retries until channel is established +// successfully. Returns an error if it encounters a permanent error. +// +// It logs the errors with appropriate log levels; don't log returned error +func (h *statsHook) callStatsWithRetry(ctx context.Context, handle interfaces.DriverStats) (<-chan *cstructs.TaskResourceUsage, error) { + var retry int + +MAIN: + if ctx.Err() != nil { + return nil, ctx.Err() + } + + ch, err := handle.Stats(ctx, h.interval) + if err == nil { + return ch, nil + } + + // Check if the driver doesn't implement stats + if err.Error() == cstructs.DriverStatsNotImplemented.Error() { + h.logger.Debug("driver does not support stats") + return nil, err + } + + // check if the error is terminal otherwise it's likely a + // transport error and we should retry + if re, ok := err.(*structs.RecoverableError); ok && re.IsUnrecoverable() { + return nil, err + } + + // We do not warn when the plugin is shutdown since this is + // likely because the driver plugin has unexpectedly exited, + // in which case sleeping and trying again or returning based + // on the stop channel is the correct behavior + if err == bstructs.ErrPluginShutdown { + h.logger.Debug("failed to fetching stats of task", "error", err) + } else { + h.logger.Error("failed to start stats collection for task", "error", err) + } + + limit := time.Second * 5 + backoff := 1 << (2 * uint64(retry)) * time.Second + if backoff > limit || retry > 5 { + backoff = limit + } + + // Increment retry counter + retry++ + + time.Sleep(backoff) + goto MAIN +} + func (h *statsHook) Shutdown() { h.mu.Lock() defer h.mu.Unlock()