Limit iterator uses a score threshold and a maxSkip value to be able to skip lower scoring nodes

This commit is contained in:
Preetha Appan
2018-01-23 15:13:44 -06:00
parent c1f4066c56
commit 3429dfa716
3 changed files with 290 additions and 13 deletions

View File

@@ -3,18 +3,24 @@ package scheduler
// LimitIterator is a RankIterator used to limit the number of options
// that are returned before we artificially end the stream.
type LimitIterator struct {
ctx Context
source RankIterator
limit int
seen int
ctx Context
source RankIterator
limit int
maxSkip int
scoreThreshold float64
seen int
skippedNodes []*RankedNode
skippedNodeIndex int
}
// NewLimitIterator is returns a LimitIterator with a fixed limit of returned options
func NewLimitIterator(ctx Context, source RankIterator, limit int) *LimitIterator {
func NewLimitIterator(ctx Context, source RankIterator, limit int, scoreThreshold float64, maxSkip int) *LimitIterator {
iter := &LimitIterator{
ctx: ctx,
source: source,
limit: limit,
ctx: ctx,
source: source,
limit: limit,
maxSkip: maxSkip,
scoreThreshold: scoreThreshold,
}
return iter
}
@@ -27,19 +33,41 @@ func (iter *LimitIterator) Next() *RankedNode {
if iter.seen == iter.limit {
return nil
}
option := iter.source.Next()
option := iter.nextOption()
if option == nil {
return nil
}
if len(iter.skippedNodes) < iter.maxSkip {
// Try skipping ahead up to maxSkip to find an option with score lesser than the threshold
for option != nil && option.Score <= iter.scoreThreshold && len(iter.skippedNodes) < iter.maxSkip {
iter.skippedNodes = append(iter.skippedNodes, option)
option = iter.source.Next()
}
}
iter.seen += 1
if option == nil { // Didn't find anything, so use the skipped nodes instead
return iter.nextOption()
}
return option
}
// nextOption uses the iterator's list of skipped nodes if the source iterator is exhausted
func (iter *LimitIterator) nextOption() *RankedNode {
sourceOption := iter.source.Next()
if sourceOption == nil && iter.skippedNodeIndex < len(iter.skippedNodes) {
skippedOption := iter.skippedNodes[iter.skippedNodeIndex]
iter.skippedNodeIndex += 1
return skippedOption
}
return sourceOption
}
func (iter *LimitIterator) Reset() {
iter.source.Reset()
iter.seen = 0
iter.skippedNodes = nil
iter.skippedNodeIndex = 0
}
// MaxScoreIterator is a RankIterator used to return only a single result

View File

@@ -4,6 +4,8 @@ import (
"testing"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestLimitIterator(t *testing.T) {
@@ -24,7 +26,7 @@ func TestLimitIterator(t *testing.T) {
}
static := NewStaticRankIterator(ctx, nodes)
limit := NewLimitIterator(ctx, static, 1)
limit := NewLimitIterator(ctx, static, 1, 0, 2)
limit.SetLimit(2)
out := collectRanked(limit)
@@ -50,6 +52,244 @@ func TestLimitIterator(t *testing.T) {
}
}
func TestLimitIterator_ScoreThreshold(t *testing.T) {
_, ctx := testContext(t)
type testCase struct {
desc string
nodes []*RankedNode
expectedOut []*RankedNode
threshold float64
limit int
maxSkip int
}
var nodes []*structs.Node
for i := 0; i < 10; i++ {
nodes = append(nodes, mock.Node())
}
testCases := []testCase{
{
desc: "Skips one low scoring node",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
},
{
Node: nodes[1],
Score: 2,
},
{
Node: nodes[2],
Score: 3,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[1],
Score: 2,
},
{
Node: nodes[2],
Score: 3,
},
},
threshold: -1,
limit: 2,
maxSkip: 2,
},
{
desc: "Skips maxSkip scoring nodes",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
},
{
Node: nodes[1],
Score: -2,
},
{
Node: nodes[2],
Score: 3,
},
{
Node: nodes[3],
Score: 4,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[2],
Score: 3,
},
{
Node: nodes[3],
Score: 4,
},
},
threshold: -1,
limit: 2,
maxSkip: 2,
},
{
desc: "maxSkip limit reached",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
},
{
Node: nodes[1],
Score: -6,
},
{
Node: nodes[2],
Score: -3,
},
{
Node: nodes[3],
Score: -4,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[2],
Score: -3,
},
{
Node: nodes[3],
Score: -4,
},
},
threshold: -1,
limit: 2,
maxSkip: 2,
},
{
desc: "draw both from skipped nodes",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
},
{
Node: nodes[1],
Score: -6,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[0],
Score: -1,
},
{
Node: nodes[1],
Score: -6,
},
},
threshold: -1,
limit: 2,
maxSkip: 2,
}, {
desc: "one node above threshold, one skipped node",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
},
{
Node: nodes[1],
Score: 5,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[1],
Score: 5,
},
{
Node: nodes[0],
Score: -1,
},
},
threshold: -1,
limit: 2,
maxSkip: 2,
},
{
desc: "low scoring nodes interspersed",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
},
{
Node: nodes[1],
Score: 5,
},
{
Node: nodes[2],
Score: -2,
},
{
Node: nodes[3],
Score: 2,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[1],
Score: 5,
},
{
Node: nodes[3],
Score: 2,
},
},
threshold: -1,
limit: 2,
maxSkip: 2,
},
{
desc: "only one node, score below threshold",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[0],
Score: -1,
},
},
threshold: -1,
limit: 2,
maxSkip: 2,
},
}
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
static := NewStaticRankIterator(ctx, tc.nodes)
limit := NewLimitIterator(ctx, static, 1, 0, 2)
limit.SetLimit(2)
out := collectRanked(limit)
require := require.New(t)
require.Equal(tc.expectedOut, out)
limit.Reset()
require.Equal(0, limit.skippedNodeIndex)
require.Equal(0, len(limit.skippedNodes))
})
}
}
func TestMaxScoreIterator(t *testing.T) {
_, ctx := testContext(t)
nodes := []*RankedNode{

View File

@@ -17,8 +17,17 @@ const (
// serviceJobAntiAffinityPenalty but for batch type jobs.
batchJobAntiAffinityPenalty = 10.0
// previousFailedAllocNodePenalty is a scoring penalty for nodes that a failed allocation was previously run on
// previousFailedAllocNodePenalty is a scoring penalty for nodes
// that a failed allocation was previously run on
previousFailedAllocNodePenalty = 50.0
// skipScoreThreshold is a threshold used in the limit iterator to skip nodes
// that have a score lower than this. This threshold ensures skipping nodes
// that have more than one anti affinity penalty (job and node) applied to them
skipScoreThreshold = -250.0
// maxSkip limits the number of nodes that can be skipped in the limit iterator
maxSkip = 3
)
// Stack is a chained collection of iterators. The stack is used to
@@ -123,7 +132,7 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
s.nodeAntiAff = NewNodeAntiAffinityIterator(ctx, s.jobAntiAff, previousFailedAllocNodePenalty)
// Apply a limit function. This is to avoid scanning *every* possible node.
s.limit = NewLimitIterator(ctx, s.nodeAntiAff, 2)
s.limit = NewLimitIterator(ctx, s.nodeAntiAff, 2, skipScoreThreshold, maxSkip)
// Select the node with the maximum score for placement
s.maxScore = NewMaxScoreIterator(ctx, s.limit)