diff --git a/scheduler/select.go b/scheduler/select.go index f5b25e244..133156048 100644 --- a/scheduler/select.go +++ b/scheduler/select.go @@ -3,18 +3,24 @@ package scheduler // LimitIterator is a RankIterator used to limit the number of options // that are returned before we artificially end the stream. type LimitIterator struct { - ctx Context - source RankIterator - limit int - seen int + ctx Context + source RankIterator + limit int + maxSkip int + scoreThreshold float64 + seen int + skippedNodes []*RankedNode + skippedNodeIndex int } // NewLimitIterator is returns a LimitIterator with a fixed limit of returned options -func NewLimitIterator(ctx Context, source RankIterator, limit int) *LimitIterator { +func NewLimitIterator(ctx Context, source RankIterator, limit int, scoreThreshold float64, maxSkip int) *LimitIterator { iter := &LimitIterator{ - ctx: ctx, - source: source, - limit: limit, + ctx: ctx, + source: source, + limit: limit, + maxSkip: maxSkip, + scoreThreshold: scoreThreshold, } return iter } @@ -27,19 +33,41 @@ func (iter *LimitIterator) Next() *RankedNode { if iter.seen == iter.limit { return nil } - - option := iter.source.Next() + option := iter.nextOption() if option == nil { return nil } + if len(iter.skippedNodes) < iter.maxSkip { + // Try skipping ahead up to maxSkip to find an option with score lesser than the threshold + for option != nil && option.Score <= iter.scoreThreshold && len(iter.skippedNodes) < iter.maxSkip { + iter.skippedNodes = append(iter.skippedNodes, option) + option = iter.source.Next() + } + } iter.seen += 1 + if option == nil { // Didn't find anything, so use the skipped nodes instead + return iter.nextOption() + } return option } +// nextOption uses the iterator's list of skipped nodes if the source iterator is exhausted +func (iter *LimitIterator) nextOption() *RankedNode { + sourceOption := iter.source.Next() + if sourceOption == nil && iter.skippedNodeIndex < len(iter.skippedNodes) { + skippedOption := iter.skippedNodes[iter.skippedNodeIndex] + iter.skippedNodeIndex += 1 + return skippedOption + } + return sourceOption +} + func (iter *LimitIterator) Reset() { iter.source.Reset() iter.seen = 0 + iter.skippedNodes = nil + iter.skippedNodeIndex = 0 } // MaxScoreIterator is a RankIterator used to return only a single result diff --git a/scheduler/select_test.go b/scheduler/select_test.go index 1c85c8dcb..531f684fb 100644 --- a/scheduler/select_test.go +++ b/scheduler/select_test.go @@ -4,6 +4,8 @@ import ( "testing" "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" ) func TestLimitIterator(t *testing.T) { @@ -24,7 +26,7 @@ func TestLimitIterator(t *testing.T) { } static := NewStaticRankIterator(ctx, nodes) - limit := NewLimitIterator(ctx, static, 1) + limit := NewLimitIterator(ctx, static, 1, 0, 2) limit.SetLimit(2) out := collectRanked(limit) @@ -50,6 +52,244 @@ func TestLimitIterator(t *testing.T) { } } +func TestLimitIterator_ScoreThreshold(t *testing.T) { + _, ctx := testContext(t) + type testCase struct { + desc string + nodes []*RankedNode + expectedOut []*RankedNode + threshold float64 + limit int + maxSkip int + } + + var nodes []*structs.Node + for i := 0; i < 10; i++ { + nodes = append(nodes, mock.Node()) + } + + testCases := []testCase{ + { + desc: "Skips one low scoring node", + nodes: []*RankedNode{ + { + Node: nodes[0], + Score: -1, + }, + { + Node: nodes[1], + Score: 2, + }, + { + Node: nodes[2], + Score: 3, + }, + }, + expectedOut: []*RankedNode{ + { + Node: nodes[1], + Score: 2, + }, + { + Node: nodes[2], + Score: 3, + }, + }, + threshold: -1, + limit: 2, + maxSkip: 2, + }, + { + desc: "Skips maxSkip scoring nodes", + nodes: []*RankedNode{ + { + Node: nodes[0], + Score: -1, + }, + { + Node: nodes[1], + Score: -2, + }, + { + Node: nodes[2], + Score: 3, + }, + { + Node: nodes[3], + Score: 4, + }, + }, + expectedOut: []*RankedNode{ + { + Node: nodes[2], + Score: 3, + }, + { + Node: nodes[3], + Score: 4, + }, + }, + threshold: -1, + limit: 2, + maxSkip: 2, + }, + { + desc: "maxSkip limit reached", + nodes: []*RankedNode{ + { + Node: nodes[0], + Score: -1, + }, + { + Node: nodes[1], + Score: -6, + }, + { + Node: nodes[2], + Score: -3, + }, + { + Node: nodes[3], + Score: -4, + }, + }, + expectedOut: []*RankedNode{ + { + Node: nodes[2], + Score: -3, + }, + { + Node: nodes[3], + Score: -4, + }, + }, + threshold: -1, + limit: 2, + maxSkip: 2, + }, + { + desc: "draw both from skipped nodes", + nodes: []*RankedNode{ + { + Node: nodes[0], + Score: -1, + }, + { + Node: nodes[1], + Score: -6, + }, + }, + expectedOut: []*RankedNode{ + { + Node: nodes[0], + Score: -1, + }, + { + Node: nodes[1], + Score: -6, + }, + }, + threshold: -1, + limit: 2, + maxSkip: 2, + }, { + desc: "one node above threshold, one skipped node", + nodes: []*RankedNode{ + { + Node: nodes[0], + Score: -1, + }, + { + Node: nodes[1], + Score: 5, + }, + }, + expectedOut: []*RankedNode{ + { + Node: nodes[1], + Score: 5, + }, + { + Node: nodes[0], + Score: -1, + }, + }, + threshold: -1, + limit: 2, + maxSkip: 2, + }, + { + desc: "low scoring nodes interspersed", + nodes: []*RankedNode{ + { + Node: nodes[0], + Score: -1, + }, + { + Node: nodes[1], + Score: 5, + }, + { + Node: nodes[2], + Score: -2, + }, + { + Node: nodes[3], + Score: 2, + }, + }, + expectedOut: []*RankedNode{ + { + Node: nodes[1], + Score: 5, + }, + { + Node: nodes[3], + Score: 2, + }, + }, + threshold: -1, + limit: 2, + maxSkip: 2, + }, + { + desc: "only one node, score below threshold", + nodes: []*RankedNode{ + { + Node: nodes[0], + Score: -1, + }, + }, + expectedOut: []*RankedNode{ + { + Node: nodes[0], + Score: -1, + }, + }, + threshold: -1, + limit: 2, + maxSkip: 2, + }, + } + + for _, tc := range testCases { + t.Run(tc.desc, func(t *testing.T) { + static := NewStaticRankIterator(ctx, tc.nodes) + + limit := NewLimitIterator(ctx, static, 1, 0, 2) + limit.SetLimit(2) + out := collectRanked(limit) + require := require.New(t) + require.Equal(tc.expectedOut, out) + + limit.Reset() + require.Equal(0, limit.skippedNodeIndex) + require.Equal(0, len(limit.skippedNodes)) + }) + } + +} + func TestMaxScoreIterator(t *testing.T) { _, ctx := testContext(t) nodes := []*RankedNode{ diff --git a/scheduler/stack.go b/scheduler/stack.go index 16a982b7a..d7435c0a8 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -17,8 +17,17 @@ const ( // serviceJobAntiAffinityPenalty but for batch type jobs. batchJobAntiAffinityPenalty = 10.0 - // previousFailedAllocNodePenalty is a scoring penalty for nodes that a failed allocation was previously run on + // previousFailedAllocNodePenalty is a scoring penalty for nodes + // that a failed allocation was previously run on previousFailedAllocNodePenalty = 50.0 + + // skipScoreThreshold is a threshold used in the limit iterator to skip nodes + // that have a score lower than this. This threshold ensures skipping nodes + // that have more than one anti affinity penalty (job and node) applied to them + skipScoreThreshold = -250.0 + + // maxSkip limits the number of nodes that can be skipped in the limit iterator + maxSkip = 3 ) // Stack is a chained collection of iterators. The stack is used to @@ -123,7 +132,7 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { s.nodeAntiAff = NewNodeAntiAffinityIterator(ctx, s.jobAntiAff, previousFailedAllocNodePenalty) // Apply a limit function. This is to avoid scanning *every* possible node. - s.limit = NewLimitIterator(ctx, s.nodeAntiAff, 2) + s.limit = NewLimitIterator(ctx, s.nodeAntiAff, 2, skipScoreThreshold, maxSkip) // Select the node with the maximum score for placement s.maxScore = NewMaxScoreIterator(ctx, s.limit)