Delay Nack re-enqueue

Add a delay when an evaluation is nacked that starts off small but
compounds to a larger delay for subsequent Nacks. This creates some
back pressure.
This commit is contained in:
Alex Dadgar
2017-04-12 13:39:19 -07:00
parent c005d29fc1
commit 57ed87324f
2 changed files with 279 additions and 54 deletions

View File

@@ -28,6 +28,14 @@ var (
// ErrNackTimeoutReached is returned if an expired evaluation is reset
ErrNackTimeoutReached = errors.New("evaluation nack timeout reached")
// initialNackReenqueueDelay is the delay applied before re-enqueuing a
// Nacked evaluation for the first time
initialNackReenqueueDelay = time.Second
// subsequentNackReenqueueDelay is a compounding delay applied on each
// subsequent Nack for an evaluation.
subsequentNackReenqueueDelay = 20 * time.Second
)
// EvalBroker is used to manage brokering of evaluations. When an evaluation is
@@ -76,6 +84,10 @@ type EvalBroker struct {
// timeWait has evaluations that are waiting for time to elapse
timeWait map[string]*time.Timer
// Nack delays are defaulted and are only made available for testing
initialNackDelay time.Duration
subsequentNackDelay time.Duration
l sync.RWMutex
}
@@ -100,18 +112,20 @@ func NewEvalBroker(timeout time.Duration, deliveryLimit int) (*EvalBroker, error
return nil, fmt.Errorf("timeout cannot be negative")
}
b := &EvalBroker{
nackTimeout: timeout,
deliveryLimit: deliveryLimit,
enabled: false,
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[string]string),
blocked: make(map[string]PendingEvaluations),
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
requeue: make(map[string]*structs.Evaluation),
timeWait: make(map[string]*time.Timer),
nackTimeout: timeout,
deliveryLimit: deliveryLimit,
enabled: false,
stats: new(BrokerStats),
evals: make(map[string]int),
jobEvals: make(map[string]string),
blocked: make(map[string]PendingEvaluations),
ready: make(map[string]PendingEvaluations),
unack: make(map[string]*unackEval),
waiting: make(map[string]chan struct{}),
requeue: make(map[string]*structs.Evaluation),
timeWait: make(map[string]*time.Timer),
initialNackDelay: initialNackReenqueueDelay,
subsequentNackDelay: subsequentNackReenqueueDelay,
}
b.stats.ByScheduler = make(map[string]*SchedulerStats)
return b, nil
@@ -187,17 +201,23 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) {
// Check if we need to enforce a wait
if eval.Wait > 0 {
timer := time.AfterFunc(eval.Wait, func() {
b.enqueueWaiting(eval)
})
b.timeWait[eval.ID] = timer
b.stats.TotalWaiting += 1
b.processWaitingEnqueue(eval)
return
}
b.enqueueLocked(eval, eval.Type)
}
// processWaitingEnqueue waits the given duration on the evaluation before
// enqueueing.
func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) {
timer := time.AfterFunc(eval.Wait, func() {
b.enqueueWaiting(eval)
})
b.timeWait[eval.ID] = timer
b.stats.TotalWaiting += 1
}
// enqueueWaiting is used to enqueue a waiting evaluation
func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) {
b.l.Lock()
@@ -547,14 +567,40 @@ func (b *EvalBroker) Nack(evalID, token string) error {
// Check if we've hit the delivery limit, and re-enqueue
// in the failedQueue
if b.evals[evalID] >= b.deliveryLimit {
if dequeues := b.evals[evalID]; dequeues >= b.deliveryLimit {
b.enqueueLocked(unack.Eval, failedQueue)
} else {
b.enqueueLocked(unack.Eval, unack.Eval.Type)
e := unack.Eval
e.Wait = b.nackReenqueueDelay(e, dequeues)
// See if there should be a delay before re-enqueuing
if e.Wait > 0 {
b.processWaitingEnqueue(e)
} else {
b.enqueueLocked(e, e.Type)
}
}
return nil
}
// nackReenqueueDelay is used to determine the delay that should be applied on
// the evaluation given the number of previous attempts
func (b *EvalBroker) nackReenqueueDelay(eval *structs.Evaluation, prevDequeues int) time.Duration {
var delay time.Duration
switch {
case prevDequeues <= 0:
case prevDequeues == 1:
delay = b.initialNackDelay
default:
// For each subsequent nack compound a delay
delay = time.Duration(prevDequeues-1) * b.subsequentNackDelay
}
return delay
}
// PauseNackTimeout is used to pause the Nack timeout for an eval that is making
// progress but is in a potentially unbounded operation such as the plan queue.
func (b *EvalBroker) PauseNackTimeout(evalID, token string) error {

View File

@@ -1,11 +1,13 @@
package nomad
import (
"fmt"
"testing"
"time"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)
var (
@@ -23,6 +25,11 @@ func testBroker(t *testing.T, timeout time.Duration) *EvalBroker {
if err != nil {
t.Fatalf("err: %v", err)
}
// Tune the Nack delay
b.initialNackDelay = 5 * time.Millisecond
b.subsequentNackDelay = 50 * time.Millisecond
return b
}
@@ -126,19 +133,28 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
t.Fatalf("bad: %#v", stats)
}
testutil.WaitForResult(func() (bool, error) {
stats = b.Stats()
if stats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalWaiting != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
return true, nil
}, func(e error) {
t.Fatal(e)
})
// Dequeue should work again
out2, token2, err := b.Dequeue(defaultSched, time.Second)
@@ -192,6 +208,163 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) {
}
}
func TestEvalBroker_Nack_Delay(t *testing.T) {
b := testBroker(t, 0)
// Enqueue, but broker is disabled!
b.SetEnabled(true)
eval := mock.Eval()
b.Enqueue(eval)
// Dequeue should work
out, token, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out != eval {
t.Fatalf("bad : %#v", out)
}
// Nack back into the queue
err = b.Nack(eval.ID, token)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := b.Outstanding(out.ID); ok {
t.Fatalf("should not be outstanding")
}
// Check the stats to ensure that it is waiting
stats := b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalWaiting != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
t.Fatalf("bad: %#v", stats)
}
// Now wait for it to be re-enqueued
testutil.WaitForResult(func() (bool, error) {
stats = b.Stats()
if stats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalWaiting != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
return true, nil
}, func(e error) {
t.Fatal(e)
})
// Dequeue should work again
out2, token2, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out2 != eval {
t.Fatalf("bad : %#v", out2)
}
if token2 == token {
t.Fatalf("should get a new token")
}
// Capture the time
start := time.Now()
// Nack back into the queue
err = b.Nack(eval.ID, token2)
if err != nil {
t.Fatalf("err: %v", err)
}
// Now wait for it to be re-enqueued
testutil.WaitForResult(func() (bool, error) {
stats = b.Stats()
if stats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalWaiting != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
return true, nil
}, func(e error) {
t.Fatal(e)
})
delay := time.Now().Sub(start)
if delay < b.subsequentNackDelay {
t.Fatalf("bad: delay was %v; want at least %v", delay, b.subsequentNackDelay)
}
// Dequeue should work again
out3, token3, err := b.Dequeue(defaultSched, time.Second)
if err != nil {
t.Fatalf("err: %v", err)
}
if out3 != eval {
t.Fatalf("bad : %#v", out3)
}
if token3 == token || token3 == token2 {
t.Fatalf("should get a new token")
}
// Ack finally
err = b.Ack(eval.ID, token3)
if err != nil {
t.Fatalf("err: %v", err)
}
if _, ok := b.Outstanding(out.ID); ok {
t.Fatalf("should not be outstanding")
}
// Check the stats
stats = b.Stats()
if stats.TotalReady != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Ready != 0 {
t.Fatalf("bad: %#v", stats)
}
if stats.ByScheduler[eval.Type].Unacked != 0 {
t.Fatalf("bad: %#v", stats)
}
}
func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) {
b := testBroker(t, 0)
b.SetEnabled(true)
@@ -472,7 +645,7 @@ func TestEvalBroker_Dequeue_FIFO(t *testing.T) {
func TestEvalBroker_Dequeue_Fairness(t *testing.T) {
b := testBroker(t, 0)
b.SetEnabled(true)
NUM := 100
NUM := 1000
for i := 0; i < NUM; i++ {
eval1 := mock.Eval()
@@ -503,7 +676,7 @@ func TestEvalBroker_Dequeue_Fairness(t *testing.T) {
// This will fail randomly at times. It is very hard to
// test deterministically that its acting randomly.
if counter >= 25 || counter <= -25 {
if counter >= 250 || counter <= -250 {
t.Fatalf("unlikely sequence: %d", counter)
}
}
@@ -584,7 +757,7 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) {
// Ensure we nack in a timely manner
func TestEvalBroker_Nack_TimeoutReset(t *testing.T) {
b := testBroker(t, 5*time.Millisecond)
b := testBroker(t, 50*time.Millisecond)
b.SetEnabled(true)
// Enqueue
@@ -601,8 +774,8 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) {
t.Fatalf("bad: %v", out)
}
// Reset in 2 milliseconds
time.Sleep(2 * time.Millisecond)
// Reset in 20 milliseconds
time.Sleep(20 * time.Millisecond)
if err := b.OutstandingReset(out.ID, token); err != nil {
t.Fatalf("err: %v", err)
}
@@ -618,13 +791,13 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) {
}
// Check the nack timer
if diff := end.Sub(start); diff < 7*time.Millisecond {
if diff := end.Sub(start); diff < 75*time.Millisecond {
t.Fatalf("bad: %#v", diff)
}
}
func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) {
b := testBroker(t, 5*time.Millisecond)
b := testBroker(t, 50*time.Millisecond)
b.SetEnabled(true)
// Enqueue
@@ -641,14 +814,14 @@ func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) {
t.Fatalf("bad: %v", out)
}
// Pause in 2 milliseconds
time.Sleep(2 * time.Millisecond)
// Pause in 20 milliseconds
time.Sleep(20 * time.Millisecond)
if err := b.PauseNackTimeout(out.ID, token); err != nil {
t.Fatalf("err: %v", err)
}
go func() {
time.Sleep(2 * time.Millisecond)
time.Sleep(20 * time.Millisecond)
if err := b.ResumeNackTimeout(out.ID, token); err != nil {
t.Fatalf("err: %v", err)
}
@@ -665,7 +838,7 @@ func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) {
}
// Check the nack timer
if diff := end.Sub(start); diff < 9*time.Millisecond {
if diff := end.Sub(start); diff < 95*time.Millisecond {
t.Fatalf("bad: %#v", diff)
}
}
@@ -820,7 +993,7 @@ func TestEvalBroker_Wait(t *testing.T) {
}
// Let the wait elapse
time.Sleep(15 * time.Millisecond)
time.Sleep(20 * time.Millisecond)
// Verify ready
stats = b.Stats()
@@ -976,14 +1149,20 @@ func TestEvalBroker_EnqueueAll_Requeue_Nack(t *testing.T) {
}
// Check stats again as this should cause the re-enqueued one to be dropped
stats = b.Stats()
if stats.TotalReady != 1 {
t.Fatalf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
t.Fatalf("bad: %#v", stats)
}
if len(b.requeue) != 0 {
t.Fatalf("bad: %#v", b.requeue)
}
testutil.WaitForResult(func() (bool, error) {
stats = b.Stats()
if stats.TotalReady != 1 {
return false, fmt.Errorf("bad: %#v", stats)
}
if stats.TotalUnacked != 0 {
return false, fmt.Errorf("bad: %#v", stats)
}
if len(b.requeue) != 0 {
return false, fmt.Errorf("bad: %#v", b.requeue)
}
return true, nil
}, func(e error) {
t.Fatal(e)
})
}