diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 783ad84f0..3733a2884 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -28,6 +28,14 @@ var ( // ErrNackTimeoutReached is returned if an expired evaluation is reset ErrNackTimeoutReached = errors.New("evaluation nack timeout reached") + + // initialNackReenqueueDelay is the delay applied before re-enqueuing a + // Nacked evaluation for the first time + initialNackReenqueueDelay = time.Second + + // subsequentNackReenqueueDelay is a compounding delay applied on each + // subsequent Nack for an evaluation. + subsequentNackReenqueueDelay = 20 * time.Second ) // EvalBroker is used to manage brokering of evaluations. When an evaluation is @@ -76,6 +84,10 @@ type EvalBroker struct { // timeWait has evaluations that are waiting for time to elapse timeWait map[string]*time.Timer + // Nack delays are defaulted and are only made available for testing + initialNackDelay time.Duration + subsequentNackDelay time.Duration + l sync.RWMutex } @@ -100,18 +112,20 @@ func NewEvalBroker(timeout time.Duration, deliveryLimit int) (*EvalBroker, error return nil, fmt.Errorf("timeout cannot be negative") } b := &EvalBroker{ - nackTimeout: timeout, - deliveryLimit: deliveryLimit, - enabled: false, - stats: new(BrokerStats), - evals: make(map[string]int), - jobEvals: make(map[string]string), - blocked: make(map[string]PendingEvaluations), - ready: make(map[string]PendingEvaluations), - unack: make(map[string]*unackEval), - waiting: make(map[string]chan struct{}), - requeue: make(map[string]*structs.Evaluation), - timeWait: make(map[string]*time.Timer), + nackTimeout: timeout, + deliveryLimit: deliveryLimit, + enabled: false, + stats: new(BrokerStats), + evals: make(map[string]int), + jobEvals: make(map[string]string), + blocked: make(map[string]PendingEvaluations), + ready: make(map[string]PendingEvaluations), + unack: make(map[string]*unackEval), + waiting: make(map[string]chan struct{}), + requeue: make(map[string]*structs.Evaluation), + timeWait: make(map[string]*time.Timer), + initialNackDelay: initialNackReenqueueDelay, + subsequentNackDelay: subsequentNackReenqueueDelay, } b.stats.ByScheduler = make(map[string]*SchedulerStats) return b, nil @@ -187,17 +201,23 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) { // Check if we need to enforce a wait if eval.Wait > 0 { - timer := time.AfterFunc(eval.Wait, func() { - b.enqueueWaiting(eval) - }) - b.timeWait[eval.ID] = timer - b.stats.TotalWaiting += 1 + b.processWaitingEnqueue(eval) return } b.enqueueLocked(eval, eval.Type) } +// processWaitingEnqueue waits the given duration on the evaluation before +// enqueueing. +func (b *EvalBroker) processWaitingEnqueue(eval *structs.Evaluation) { + timer := time.AfterFunc(eval.Wait, func() { + b.enqueueWaiting(eval) + }) + b.timeWait[eval.ID] = timer + b.stats.TotalWaiting += 1 +} + // enqueueWaiting is used to enqueue a waiting evaluation func (b *EvalBroker) enqueueWaiting(eval *structs.Evaluation) { b.l.Lock() @@ -547,14 +567,40 @@ func (b *EvalBroker) Nack(evalID, token string) error { // Check if we've hit the delivery limit, and re-enqueue // in the failedQueue - if b.evals[evalID] >= b.deliveryLimit { + if dequeues := b.evals[evalID]; dequeues >= b.deliveryLimit { b.enqueueLocked(unack.Eval, failedQueue) } else { - b.enqueueLocked(unack.Eval, unack.Eval.Type) + e := unack.Eval + e.Wait = b.nackReenqueueDelay(e, dequeues) + + // See if there should be a delay before re-enqueuing + if e.Wait > 0 { + b.processWaitingEnqueue(e) + } else { + b.enqueueLocked(e, e.Type) + } } + return nil } +// nackReenqueueDelay is used to determine the delay that should be applied on +// the evaluation given the number of previous attempts +func (b *EvalBroker) nackReenqueueDelay(eval *structs.Evaluation, prevDequeues int) time.Duration { + var delay time.Duration + + switch { + case prevDequeues <= 0: + case prevDequeues == 1: + delay = b.initialNackDelay + default: + // For each subsequent nack compound a delay + delay = time.Duration(prevDequeues-1) * b.subsequentNackDelay + } + + return delay +} + // PauseNackTimeout is used to pause the Nack timeout for an eval that is making // progress but is in a potentially unbounded operation such as the plan queue. func (b *EvalBroker) PauseNackTimeout(evalID, token string) error { diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index ab6ecedf0..c263fb1ef 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -1,11 +1,13 @@ package nomad import ( + "fmt" "testing" "time" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" ) var ( @@ -23,6 +25,11 @@ func testBroker(t *testing.T, timeout time.Duration) *EvalBroker { if err != nil { t.Fatalf("err: %v", err) } + + // Tune the Nack delay + b.initialNackDelay = 5 * time.Millisecond + b.subsequentNackDelay = 50 * time.Millisecond + return b } @@ -126,19 +133,28 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { } // Check the stats - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if stats.ByScheduler[eval.Type].Ready != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.ByScheduler[eval.Type].Unacked != 0 { - t.Fatalf("bad: %#v", stats) - } + testutil.WaitForResult(func() (bool, error) { + stats = b.Stats() + if stats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalWaiting != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Ready != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Unacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + + return true, nil + }, func(e error) { + t.Fatal(e) + }) // Dequeue should work again out2, token2, err := b.Dequeue(defaultSched, time.Second) @@ -192,6 +208,163 @@ func TestEvalBroker_Enqueue_Dequeue_Nack_Ack(t *testing.T) { } } +func TestEvalBroker_Nack_Delay(t *testing.T) { + b := testBroker(t, 0) + + // Enqueue, but broker is disabled! + b.SetEnabled(true) + eval := mock.Eval() + b.Enqueue(eval) + + // Dequeue should work + out, token, err := b.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if out != eval { + t.Fatalf("bad : %#v", out) + } + + // Nack back into the queue + err = b.Nack(eval.ID, token) + if err != nil { + t.Fatalf("err: %v", err) + } + + if _, ok := b.Outstanding(out.ID); ok { + t.Fatalf("should not be outstanding") + } + + // Check the stats to ensure that it is waiting + stats := b.Stats() + if stats.TotalReady != 0 { + t.Fatalf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + t.Fatalf("bad: %#v", stats) + } + if stats.TotalWaiting != 1 { + t.Fatalf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Ready != 0 { + t.Fatalf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Unacked != 0 { + t.Fatalf("bad: %#v", stats) + } + + // Now wait for it to be re-enqueued + testutil.WaitForResult(func() (bool, error) { + stats = b.Stats() + if stats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalWaiting != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Ready != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Unacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + + return true, nil + }, func(e error) { + t.Fatal(e) + }) + + // Dequeue should work again + out2, token2, err := b.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if out2 != eval { + t.Fatalf("bad : %#v", out2) + } + if token2 == token { + t.Fatalf("should get a new token") + } + + // Capture the time + start := time.Now() + + // Nack back into the queue + err = b.Nack(eval.ID, token2) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Now wait for it to be re-enqueued + testutil.WaitForResult(func() (bool, error) { + stats = b.Stats() + if stats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalWaiting != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Ready != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Unacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + + return true, nil + }, func(e error) { + t.Fatal(e) + }) + + delay := time.Now().Sub(start) + if delay < b.subsequentNackDelay { + t.Fatalf("bad: delay was %v; want at least %v", delay, b.subsequentNackDelay) + } + + // Dequeue should work again + out3, token3, err := b.Dequeue(defaultSched, time.Second) + if err != nil { + t.Fatalf("err: %v", err) + } + if out3 != eval { + t.Fatalf("bad : %#v", out3) + } + if token3 == token || token3 == token2 { + t.Fatalf("should get a new token") + } + + // Ack finally + err = b.Ack(eval.ID, token3) + if err != nil { + t.Fatalf("err: %v", err) + } + + if _, ok := b.Outstanding(out.ID); ok { + t.Fatalf("should not be outstanding") + } + + // Check the stats + stats = b.Stats() + if stats.TotalReady != 0 { + t.Fatalf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + t.Fatalf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Ready != 0 { + t.Fatalf("bad: %#v", stats) + } + if stats.ByScheduler[eval.Type].Unacked != 0 { + t.Fatalf("bad: %#v", stats) + } +} + func TestEvalBroker_Serialize_DuplicateJobID(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) @@ -472,7 +645,7 @@ func TestEvalBroker_Dequeue_FIFO(t *testing.T) { func TestEvalBroker_Dequeue_Fairness(t *testing.T) { b := testBroker(t, 0) b.SetEnabled(true) - NUM := 100 + NUM := 1000 for i := 0; i < NUM; i++ { eval1 := mock.Eval() @@ -503,7 +676,7 @@ func TestEvalBroker_Dequeue_Fairness(t *testing.T) { // This will fail randomly at times. It is very hard to // test deterministically that its acting randomly. - if counter >= 25 || counter <= -25 { + if counter >= 250 || counter <= -250 { t.Fatalf("unlikely sequence: %d", counter) } } @@ -584,7 +757,7 @@ func TestEvalBroker_Nack_Timeout(t *testing.T) { // Ensure we nack in a timely manner func TestEvalBroker_Nack_TimeoutReset(t *testing.T) { - b := testBroker(t, 5*time.Millisecond) + b := testBroker(t, 50*time.Millisecond) b.SetEnabled(true) // Enqueue @@ -601,8 +774,8 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) { t.Fatalf("bad: %v", out) } - // Reset in 2 milliseconds - time.Sleep(2 * time.Millisecond) + // Reset in 20 milliseconds + time.Sleep(20 * time.Millisecond) if err := b.OutstandingReset(out.ID, token); err != nil { t.Fatalf("err: %v", err) } @@ -618,13 +791,13 @@ func TestEvalBroker_Nack_TimeoutReset(t *testing.T) { } // Check the nack timer - if diff := end.Sub(start); diff < 7*time.Millisecond { + if diff := end.Sub(start); diff < 75*time.Millisecond { t.Fatalf("bad: %#v", diff) } } func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) { - b := testBroker(t, 5*time.Millisecond) + b := testBroker(t, 50*time.Millisecond) b.SetEnabled(true) // Enqueue @@ -641,14 +814,14 @@ func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) { t.Fatalf("bad: %v", out) } - // Pause in 2 milliseconds - time.Sleep(2 * time.Millisecond) + // Pause in 20 milliseconds + time.Sleep(20 * time.Millisecond) if err := b.PauseNackTimeout(out.ID, token); err != nil { t.Fatalf("err: %v", err) } go func() { - time.Sleep(2 * time.Millisecond) + time.Sleep(20 * time.Millisecond) if err := b.ResumeNackTimeout(out.ID, token); err != nil { t.Fatalf("err: %v", err) } @@ -665,7 +838,7 @@ func TestEvalBroker_PauseResumeNackTimeout(t *testing.T) { } // Check the nack timer - if diff := end.Sub(start); diff < 9*time.Millisecond { + if diff := end.Sub(start); diff < 95*time.Millisecond { t.Fatalf("bad: %#v", diff) } } @@ -820,7 +993,7 @@ func TestEvalBroker_Wait(t *testing.T) { } // Let the wait elapse - time.Sleep(15 * time.Millisecond) + time.Sleep(20 * time.Millisecond) // Verify ready stats = b.Stats() @@ -976,14 +1149,20 @@ func TestEvalBroker_EnqueueAll_Requeue_Nack(t *testing.T) { } // Check stats again as this should cause the re-enqueued one to be dropped - stats = b.Stats() - if stats.TotalReady != 1 { - t.Fatalf("bad: %#v", stats) - } - if stats.TotalUnacked != 0 { - t.Fatalf("bad: %#v", stats) - } - if len(b.requeue) != 0 { - t.Fatalf("bad: %#v", b.requeue) - } + testutil.WaitForResult(func() (bool, error) { + stats = b.Stats() + if stats.TotalReady != 1 { + return false, fmt.Errorf("bad: %#v", stats) + } + if stats.TotalUnacked != 0 { + return false, fmt.Errorf("bad: %#v", stats) + } + if len(b.requeue) != 0 { + return false, fmt.Errorf("bad: %#v", b.requeue) + } + + return true, nil + }, func(e error) { + t.Fatal(e) + }) }