From c841dc01a9c1d57f5e98b3bb44f2176df802643c Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Mon, 5 Mar 2018 21:46:54 -0600 Subject: [PATCH] Address some code review comments --- nomad/delayheap/delay_heap.go | 29 +++++++++++++++-------------- nomad/delayheap/delay_heap_test.go | 4 ++-- nomad/eval_broker.go | 25 ++++++++++++++++--------- nomad/eval_broker_test.go | 5 +++-- nomad/structs/structs.go | 2 +- 5 files changed, 37 insertions(+), 28 deletions(-) diff --git a/nomad/delayheap/delay_heap.go b/nomad/delayheap/delay_heap.go index 50ca56674..4e8ba669c 100644 --- a/nomad/delayheap/delay_heap.go +++ b/nomad/delayheap/delay_heap.go @@ -9,21 +9,22 @@ import ( ) // DelayHeap wraps a heap and gives operations other than Push/Pop. -// The inner heap is sorted by the time in the WaitUntil field of DelayHeapNode +// The inner heap is sorted by the time in the WaitUntil field of delayHeapNode type DelayHeap struct { - index map[structs.NamespacedID]*DelayHeapNode + index map[structs.NamespacedID]*delayHeapNode heap delayedHeapImp } +// HeapNode is an interface type implemented by objects stored in the DelayHeap type HeapNode interface { - Data() interface{} - ID() string - Namespace() string + Data() interface{} // The data object + ID() string // ID of the object, used in conjunction with namespace for deduplication + Namespace() string // Namespace of the object, can be empty } -// DelayHeapNode encapsulates the node stored in DelayHeap +// delayHeapNode encapsulates the node stored in DelayHeap // WaitUntil is used as the sorting criteria -type DelayHeapNode struct { +type delayHeapNode struct { // Node is the data object stored in the delay heap Node HeapNode // WaitUntil is the time delay associated with the node @@ -33,7 +34,7 @@ type DelayHeapNode struct { index int } -type delayedHeapImp []*DelayHeapNode +type delayedHeapImp []*delayHeapNode func (h delayedHeapImp) Len() int { return len(h) @@ -63,7 +64,7 @@ func (h delayedHeapImp) Swap(i, j int) { } func (h *delayedHeapImp) Push(x interface{}) { - node := x.(*DelayHeapNode) + node := x.(*delayHeapNode) n := len(*h) node.index = n *h = append(*h, node) @@ -80,7 +81,7 @@ func (h *delayedHeapImp) Pop() interface{} { func NewDelayHeap() *DelayHeap { return &DelayHeap{ - index: make(map[structs.NamespacedID]*DelayHeapNode), + index: make(map[structs.NamespacedID]*delayHeapNode), heap: make(delayedHeapImp, 0), } } @@ -94,18 +95,18 @@ func (p *DelayHeap) Push(dataNode HeapNode, next time.Time) error { return fmt.Errorf("node %q (%s) already exists", dataNode.ID(), dataNode.Namespace()) } - delayHeapNode := &DelayHeapNode{dataNode, next, 0} + delayHeapNode := &delayHeapNode{dataNode, next, 0} p.index[tuple] = delayHeapNode heap.Push(&p.heap, delayHeapNode) return nil } -func (p *DelayHeap) Pop() *DelayHeapNode { +func (p *DelayHeap) Pop() *delayHeapNode { if len(p.heap) == 0 { return nil } - delayHeapNode := heap.Pop(&p.heap).(*DelayHeapNode) + delayHeapNode := heap.Pop(&p.heap).(*delayHeapNode) tuple := structs.NamespacedID{ ID: delayHeapNode.Node.ID(), Namespace: delayHeapNode.Node.Namespace(), @@ -114,7 +115,7 @@ func (p *DelayHeap) Pop() *DelayHeapNode { return delayHeapNode } -func (p *DelayHeap) Peek() *DelayHeapNode { +func (p *DelayHeap) Peek() *delayHeapNode { if len(p.heap) == 0 { return nil } diff --git a/nomad/delayheap/delay_heap_test.go b/nomad/delayheap/delay_heap_test.go index 3bb87e377..3b44acc19 100644 --- a/nomad/delayheap/delay_heap_test.go +++ b/nomad/delayheap/delay_heap_test.go @@ -105,8 +105,8 @@ func TestDelayHeap_Update(t *testing.T) { } -func getHeapEntries(delayHeap *DelayHeap, now time.Time) []*DelayHeapNode { - var entries []*DelayHeapNode +func getHeapEntries(delayHeap *DelayHeap, now time.Time) []*delayHeapNode { + var entries []*delayHeapNode for node := delayHeap.Pop(); node != nil; { entries = append(entries, node) node = delayHeap.Pop() diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index 27efd4fe7..01f597104 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -161,14 +161,17 @@ func (b *EvalBroker) Enabled() bool { // should only be enabled on the active leader. func (b *EvalBroker) SetEnabled(enabled bool) { b.l.Lock() + prevEnabled := b.enabled b.enabled = enabled - // start the go routine for delayed evals - ctx, cancel := context.WithCancel(context.Background()) - b.delayedEvalCancelFunc = cancel - go b.runDelayedEvalsWatcher(ctx) + if !prevEnabled && enabled { + // start the go routine for delayed evals + ctx, cancel := context.WithCancel(context.Background()) + b.delayedEvalCancelFunc = cancel + go b.runDelayedEvalsWatcher(ctx) + } b.l.Unlock() if !enabled { - b.Flush() + b.flush() } } @@ -230,6 +233,7 @@ func (b *EvalBroker) processEnqueue(eval *structs.Evaluation, token string) { if !eval.WaitUntil.IsZero() { b.delayHeap.Push(&evalWrapper{eval}, eval.WaitUntil) + b.stats.TotalWaiting += 1 // Signal an update. select { case b.delayedEvalsUpdateCh <- struct{}{}: @@ -675,7 +679,7 @@ func (b *EvalBroker) ResumeNackTimeout(evalID, token string) error { } // Flush is used to clear the state of the broker -func (b *EvalBroker) Flush() { +func (b *EvalBroker) flush() { b.l.Lock() defer b.l.Unlock() @@ -733,11 +737,11 @@ func (d *evalWrapper) Namespace() string { return d.eval.Namespace } -// runDelayedEvalsWatcher is a long-lived function that waits till a job's periodic spec is met and -// then creates an evaluation to run the job. +// runDelayedEvalsWatcher is a long-lived function that waits till a time deadline is met for +// pending evaluations before enqueuing them func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context) { var timerChannel <-chan time.Time - for b.enabled { + for { eval, waitUntil := b.nextDelayedEval() if waitUntil.IsZero() { timerChannel = nil @@ -752,7 +756,10 @@ func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context) { case <-timerChannel: // remove from the heap since we can enqueue it now b.delayHeap.Remove(&evalWrapper{eval}) + b.l.Lock() + b.stats.TotalWaiting -= 1 b.enqueueLocked(eval, eval.Type) + b.l.Unlock() case <-b.delayedEvalsUpdateCh: continue } diff --git a/nomad/eval_broker_test.go b/nomad/eval_broker_test.go index 87f47936e..8f7f71510 100644 --- a/nomad/eval_broker_test.go +++ b/nomad/eval_broker_test.go @@ -1141,7 +1141,7 @@ func TestEvalBroker_Wait(t *testing.T) { } } -// Test that delayed evaluations work as expected +// Ensure that delayed evaluations work as expected func TestEvalBroker_WaitUntil(t *testing.T) { t.Parallel() require := require.New(t) @@ -1166,7 +1166,7 @@ func TestEvalBroker_WaitUntil(t *testing.T) { eval3.WaitUntil = now.Add(20 * time.Millisecond) eval3.CreateIndex = 1 b.Enqueue(eval3) - + require.Equal(3, b.stats.TotalWaiting) // sleep enough for two evals to be ready time.Sleep(200 * time.Millisecond) @@ -1184,6 +1184,7 @@ func TestEvalBroker_WaitUntil(t *testing.T) { out, _, err = b.Dequeue(defaultSched, 2*time.Second) require.Nil(err) require.Equal(eval1, out) + require.Equal(0, b.stats.TotalWaiting) } // Ensure that priority is taken into account when enqueueing many evaluations. diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b51428e3f..a35574735 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2825,7 +2825,7 @@ func (r *ReschedulePolicy) Validate() error { } - //Validate Interval and other delay parameters if attempts are limited + // Validate Interval and other delay parameters if attempts are limited if !r.Unlimited { if r.Interval.Nanoseconds() < ReschedulePolicyMinInterval.Nanoseconds() { multierror.Append(&mErr, fmt.Errorf("Interval cannot be less than %v (got %v)", ReschedulePolicyMinInterval, r.Interval))