client/taskrunner: fix stats stats retry logic

Previously, if a channel is closed, we retry the Stats call.  But, if that call
fails, we go in a backoff loop without calling Stats ever again.

Here, we use a utility function for calling driverHandle.Stats call that retries
as one expects.

I aimed to preserve the logging formats but made small improvements as I saw fit.
This commit is contained in:
Mahmood Ali
2019-07-11 13:35:44 +08:00
parent d8a43b2066
commit bbf8f90ecb

View File

@@ -88,56 +88,18 @@ func (h *statsHook) Exited(context.Context, *interfaces.TaskExitedRequest, *inte
// Collection ends when the passed channel is closed
func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interfaces.DriverStats) {
ch, err := handle.Stats(ctx, h.interval)
MAIN:
ch, err := h.callStatsWithRetry(ctx, handle)
if err != nil {
// Check if the driver doesn't implement stats
if err.Error() == cstructs.DriverStatsNotImplemented.Error() {
h.logger.Debug("driver does not support stats")
return
}
h.logger.Error("failed to start stats collection for task", "error", err)
return
}
var backoff time.Duration
var retry int
limit := time.Second * 5
for {
time.Sleep(backoff)
select {
case ru, ok := <-ch:
// Channel is closed
// if channel closes, re-establish a new one
if !ok {
var re *structs.RecoverableError
ch, err = handle.Stats(ctx, h.interval)
if err == nil {
goto RETRY
}
// We do not log when the plugin is shutdown since this is
// likely because the driver plugin has unexpectedly exited,
// in which case sleeping and trying again or returning based
// on the stop channel is the correct behavior
if err != bstructs.ErrPluginShutdown {
h.logger.Debug("error fetching stats of task", "error", err)
goto RETRY
}
// check if the error is terminal otherwise it's likely a
// transport error and we should retry
re, ok = err.(*structs.RecoverableError)
if ok && re.IsUnrecoverable() {
return
}
h.logger.Warn("stats collection for task failed", "error", err)
RETRY:
// Calculate the new backoff
backoff = (1 << (2 * uint64(retry))) * time.Second
if backoff > limit {
backoff = limit
}
// Increment retry counter
retry++
continue
goto MAIN
}
// Update stats on TaskRunner and emit them
@@ -149,6 +111,58 @@ func (h *statsHook) collectResourceUsageStats(ctx context.Context, handle interf
}
}
// callStatsWithRetry invokes handle driver Stats() functions and retries until channel is established
// successfully. Returns an error if it encounters a permanent error.
//
// It logs the errors with appropriate log levels; don't log returned error
func (h *statsHook) callStatsWithRetry(ctx context.Context, handle interfaces.DriverStats) (<-chan *cstructs.TaskResourceUsage, error) {
var retry int
MAIN:
if ctx.Err() != nil {
return nil, ctx.Err()
}
ch, err := handle.Stats(ctx, h.interval)
if err == nil {
return ch, nil
}
// Check if the driver doesn't implement stats
if err.Error() == cstructs.DriverStatsNotImplemented.Error() {
h.logger.Debug("driver does not support stats")
return nil, err
}
// check if the error is terminal otherwise it's likely a
// transport error and we should retry
if re, ok := err.(*structs.RecoverableError); ok && re.IsUnrecoverable() {
return nil, err
}
// We do not warn when the plugin is shutdown since this is
// likely because the driver plugin has unexpectedly exited,
// in which case sleeping and trying again or returning based
// on the stop channel is the correct behavior
if err == bstructs.ErrPluginShutdown {
h.logger.Debug("failed to fetching stats of task", "error", err)
} else {
h.logger.Error("failed to start stats collection for task", "error", err)
}
limit := time.Second * 5
backoff := 1 << (2 * uint64(retry)) * time.Second
if backoff > limit || retry > 5 {
backoff = limit
}
// Increment retry counter
retry++
time.Sleep(backoff)
goto MAIN
}
func (h *statsHook) Shutdown() {
h.mu.Lock()
defer h.mu.Unlock()