mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 01:15:43 +03:00
nomad: use fast and slow exponential backoff in worker
This commit is contained in:
@@ -39,7 +39,7 @@ func init() {
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultRPCAddr = &net.TCPAddr{IP: net.ParseIP("0.0.0.0"), Port: 4647}
|
||||
DefaultRPCAddr = &net.TCPAddr{IP: net.ParseIP("127.0.0.1"), Port: 4647}
|
||||
)
|
||||
|
||||
// Config is used to parameterize the server
|
||||
|
||||
@@ -13,11 +13,19 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// backoffBaseline is the baseline time for exponential backoff
|
||||
backoffBaseline = 20 * time.Millisecond
|
||||
// backoffBaselineFast is the baseline time for exponential backoff
|
||||
backoffBaselineFast = 20 * time.Millisecond
|
||||
|
||||
// backoffLimit is the limit of the exponential backoff
|
||||
backoffLimit = 5 * time.Second
|
||||
// backoffBaselineSlow is the baseline time for exponential backoff
|
||||
// but that is much slower than backoffBaselineFast
|
||||
backoffBaselineSlow = 500 * time.Millisecond
|
||||
|
||||
// backoffLimitFast is the limit of the exponential backoff
|
||||
backoffLimitFast = time.Second
|
||||
|
||||
// backoffLimitSlow is the limit of the exponential backoff for
|
||||
// the slower backoff
|
||||
backoffLimitSlow = 10 * time.Second
|
||||
|
||||
// dequeueTimeout is used to timeout an evaluation dequeue so that
|
||||
// we can check if there is a shutdown event
|
||||
@@ -132,7 +140,7 @@ REQ:
|
||||
metrics.MeasureSince([]string{"nomad", "worker", "dequeue_eval"}, start)
|
||||
if err != nil {
|
||||
w.logger.Printf("[ERR] worker: failed to dequeue evaluation: %v", err)
|
||||
if w.backoffErr() {
|
||||
if w.backoffErr(backoffBaselineSlow, backoffLimitSlow) {
|
||||
return nil, "", true
|
||||
}
|
||||
goto REQ
|
||||
@@ -206,7 +214,7 @@ CHECK:
|
||||
}
|
||||
|
||||
// Exponential back off if we haven't yet reached it
|
||||
if w.backoffErr() {
|
||||
if w.backoffErr(backoffBaselineFast, backoffLimitFast) {
|
||||
return fmt.Errorf("shutdown while waiting for state sync")
|
||||
}
|
||||
goto CHECK
|
||||
@@ -269,7 +277,7 @@ SUBMIT:
|
||||
if err := w.srv.RPC("Plan.Submit", &req, &resp); err != nil {
|
||||
w.logger.Printf("[ERR] worker: failed to submit plan for evaluation %s: %v",
|
||||
plan.EvalID, err)
|
||||
if w.shouldResubmit(err) && !w.backoffErr() {
|
||||
if w.shouldResubmit(err) && !w.backoffErr(backoffBaselineSlow, backoffLimitSlow) {
|
||||
goto SUBMIT
|
||||
}
|
||||
return nil, nil, err
|
||||
@@ -332,7 +340,7 @@ SUBMIT:
|
||||
if err := w.srv.RPC("Eval.Update", &req, &resp); err != nil {
|
||||
w.logger.Printf("[ERR] worker: failed to update evaluation %#v: %v",
|
||||
eval, err)
|
||||
if w.shouldResubmit(err) && !w.backoffErr() {
|
||||
if w.shouldResubmit(err) && !w.backoffErr(backoffBaselineSlow, backoffLimitSlow) {
|
||||
goto SUBMIT
|
||||
}
|
||||
return err
|
||||
@@ -362,10 +370,10 @@ func (w *Worker) shouldResubmit(err error) bool {
|
||||
// maintained statefully for the worker. Returns if attempts should be
|
||||
// abandoneded due to shutdown.
|
||||
// be made or abandoned.
|
||||
func (w *Worker) backoffErr() bool {
|
||||
backoff := (1 << (2 * w.failures)) * backoffBaseline
|
||||
if backoff > backoffLimit {
|
||||
backoff = backoffLimit
|
||||
func (w *Worker) backoffErr(base, limit time.Duration) bool {
|
||||
backoff := (1 << (2 * w.failures)) * base
|
||||
if backoff > limit {
|
||||
backoff = limit
|
||||
} else {
|
||||
w.failures++
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user