Implement support for even spread across datacenters, with unit test

This commit is contained in:
Preetha Appan
2018-07-26 17:14:27 -05:00
parent 56de0d0a11
commit 7a5791f39e
3 changed files with 277 additions and 24 deletions

View File

@@ -234,9 +234,15 @@ func (p *propertySet) UsedCount(option *structs.Node, tg string) (string, string
if !ok {
return nValue, fmt.Sprintf("missing property %q", p.targetAttribute), 0
}
combinedUse := p.GetCombinedUseMap()
usedCount := combinedUse[nValue]
return nValue, "", usedCount
}
// combine the counts of how many times the property has been used by
// existing and proposed allocations
// GetCombinedUseMap counts how many times the property has been used by
// existing and proposed allocations. It also takes into account any stopped
// allocations
func (p *propertySet) GetCombinedUseMap() map[string]uint64 {
combinedUse := make(map[string]uint64, helper.IntMax(len(p.existingValues), len(p.proposedValues)))
for _, usedValues := range []map[string]uint64{p.existingValues, p.proposedValues} {
for propertyValue, usedCount := range usedValues {
@@ -259,9 +265,7 @@ func (p *propertySet) UsedCount(option *structs.Node, tg string) (string, string
combinedUse[propertyValue] = 0
}
}
usedCount := combinedUse[nValue]
return nValue, "", usedCount
return combinedUse
}
// filterAllocs filters a set of allocations to just be those that are running

View File

@@ -8,6 +8,18 @@ const (
// ImplicitTarget is used to represent any remaining attribute values
// when target percentages don't add up to 100
ImplicitTarget = "*"
// evenSpreadBoost is a positive boost used when the job has
// even spread over an attribute. Any nodes whose attribute value is
// equal to the minimum count over all possible attributes gets boosted
evenSpreadBoost = 0.01
// evenSpreadPenality is a penalty used when the job has
// even spread over an attribute. Any nodes whose attribute value is
// greater than the minimum count over all possible attributes gets this penalty.
// This is to ensure that other nodes are preferred when one of the values
// has a larger number of allocations
evenSpreadPenalty = -0.5
)
// SpreadIterator is used to spread allocations across a specified attribute
@@ -119,25 +131,33 @@ func (iter *SpreadIterator) Next() *RankedNode {
}
spreadAttributeMap := iter.tgSpreadInfo[tgName]
spreadDetails := spreadAttributeMap[pset.targetAttribute]
// Get the desired count
desiredCount, ok := spreadDetails.desiredCounts[nValue]
if !ok {
// See if there is an implicit target
desiredCount, ok = spreadDetails.desiredCounts[ImplicitTarget]
if !ok {
// The desired count for this attribute is zero if it gets here
// don't boost the score
continue
}
}
if float64(usedCount) < desiredCount {
// Calculate the relative weight of this specific spread attribute
spreadWeight := float64(spreadDetails.weight) / float64(iter.sumSpreadWeights)
// Score Boost is proportional the difference between current and desired count
// It is multiplied with the spread weight to account for cases where the job has
// more than one spread attribute
scoreBoost := ((desiredCount - float64(usedCount)) / desiredCount) * spreadWeight
if len(spreadDetails.desiredCounts) == 0 {
// When desired counts map is empty the user didn't specify any targets
// Treat this as a special case
scoreBoost := evenSpreadScoreBoost(pset, option.Node)
totalSpreadScore += scoreBoost
} else {
// Get the desired count
desiredCount, ok := spreadDetails.desiredCounts[nValue]
if !ok {
// See if there is an implicit target
desiredCount, ok = spreadDetails.desiredCounts[ImplicitTarget]
if !ok {
// The desired count for this attribute is zero if it gets here
// don't boost the score
continue
}
}
if float64(usedCount) < desiredCount {
// Calculate the relative weight of this specific spread attribute
spreadWeight := float64(spreadDetails.weight) / float64(iter.sumSpreadWeights)
// Score Boost is proportional the difference between current and desired count
// It is multiplied with the spread weight to account for cases where the job has
// more than one spread attribute
scoreBoost := ((desiredCount - float64(usedCount)) / desiredCount) * spreadWeight
totalSpreadScore += scoreBoost
}
}
}
@@ -149,6 +169,48 @@ func (iter *SpreadIterator) Next() *RankedNode {
}
}
// evenSpreadScoreBoost is a scoring helper that calculates the score
// for the option when even spread is desired (all attribute values get equal preference)
func evenSpreadScoreBoost(pset *propertySet, option *structs.Node) float64 {
combinedUseMap := pset.GetCombinedUseMap()
if len(combinedUseMap) == 0 {
// Nothing placed yet, so return 0 as the score
return 0.0
}
// Get the nodes property value
nValue, ok := getProperty(option, pset.targetAttribute)
currentAttributeCount := uint64(0)
if ok {
currentAttributeCount = combinedUseMap[nValue]
}
minCount := uint64(0)
maxCount := uint64(0)
for _, value := range combinedUseMap {
if minCount == 0 || value < minCount {
minCount = value
}
if maxCount == 0 || value > maxCount {
maxCount = value
}
}
if currentAttributeCount < minCount {
// Small positive boost for attributes with min count
return evenSpreadBoost
} else if currentAttributeCount > minCount {
// Negative boost if attribute count is greater than minimum
// This is so that other nodes will get a preference over this one
return evenSpreadPenalty
} else {
// When min and max are same the current distribution is even
// so we penalize
if minCount == maxCount {
return evenSpreadPenalty
} else {
return evenSpreadBoost
}
}
}
// computeSpreadInfo computes and stores percentages and total values
// from all spreads that apply to a specific task group
func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) {
@@ -167,7 +229,8 @@ func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) {
si.desiredCounts[st.Value] = desiredCount
sumDesiredCounts += desiredCount
}
if sumDesiredCounts < float64(totalCount) {
// Account for remaining count only if there is any spread targets
if sumDesiredCounts > 0 && sumDesiredCounts < float64(totalCount) {
remainingCount := float64(totalCount) - sumDesiredCounts
si.desiredCounts[ImplicitTarget] = remainingCount
}

View File

@@ -245,3 +245,189 @@ func TestSpreadIterator_MultipleAttributes(t *testing.T) {
}
}
func TestSpreadIterator_EvenSpread(t *testing.T) {
state, ctx := testContext(t)
dcs := []string{"dc1", "dc2", "dc1", "dc2", "dc1", "dc2", "dc2", "dc1", "dc1", "dc1"}
var nodes []*RankedNode
// Add these nodes to the state store
for i, dc := range dcs {
node := mock.Node()
node.Datacenter = dc
if err := state.UpsertNode(uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 10
// Configure even spread across node.datacenter
spread := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
}
tg.Spreads = []*structs.Spread{spread}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Nothing placed so both dc nodes get 0 as the score
expectedScores := map[string]float64{
"dc1": 0,
"dc2": 0,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
// Update the plan to add allocs to nodes in dc1
// After this step dc2 nodes should get boosted
ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
}
ctx.plan.NodeAllocation[nodes[2].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[2].Node.ID,
},
}
// Reset the scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect nodes in dc2 with existing allocs to get a boost
// dc1 nodes are penalized because they have allocs
expectedScores = map[string]float64{
"dc1": -0.5,
"dc2": 0.01,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
// Update the plan to add more allocs to nodes in dc2
// After this step dc1 nodes should get boosted
ctx.plan.NodeAllocation[nodes[1].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[1].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[1].Node.ID,
},
}
ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
}
// Reset the scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect nodes in dc2 to be penalized because there are 3 allocs there now
// dc1 nodes are boosted because that has 2 allocs
expectedScores = map[string]float64{
"dc1": 0.01,
"dc2": -0.5,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
// Add another node in dc3
node := mock.Node()
node.Datacenter = "dc3"
if err := state.UpsertNode(uint64(1111), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
// Add another alloc to dc1, now its count matches dc2
ctx.plan.NodeAllocation[nodes[4].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[4].Node.ID,
},
}
// Reset scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect dc1 and dc2 to be penalized because they have 3 allocs
// dc3 should get a boost because it has 0 allocs
expectedScores = map[string]float64{
"dc1": -0.5,
"dc2": -0.5,
"dc3": 0.01,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
}