diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go index 85fe45d62..8ac9b48c6 100644 --- a/scheduler/propertyset.go +++ b/scheduler/propertyset.go @@ -234,9 +234,15 @@ func (p *propertySet) UsedCount(option *structs.Node, tg string) (string, string if !ok { return nValue, fmt.Sprintf("missing property %q", p.targetAttribute), 0 } + combinedUse := p.GetCombinedUseMap() + usedCount := combinedUse[nValue] + return nValue, "", usedCount +} - // combine the counts of how many times the property has been used by - // existing and proposed allocations +// GetCombinedUseMap counts how many times the property has been used by +// existing and proposed allocations. It also takes into account any stopped +// allocations +func (p *propertySet) GetCombinedUseMap() map[string]uint64 { combinedUse := make(map[string]uint64, helper.IntMax(len(p.existingValues), len(p.proposedValues))) for _, usedValues := range []map[string]uint64{p.existingValues, p.proposedValues} { for propertyValue, usedCount := range usedValues { @@ -259,9 +265,7 @@ func (p *propertySet) UsedCount(option *structs.Node, tg string) (string, string combinedUse[propertyValue] = 0 } } - - usedCount := combinedUse[nValue] - return nValue, "", usedCount + return combinedUse } // filterAllocs filters a set of allocations to just be those that are running diff --git a/scheduler/spread.go b/scheduler/spread.go index f9acede39..2035ccf59 100644 --- a/scheduler/spread.go +++ b/scheduler/spread.go @@ -8,6 +8,18 @@ const ( // ImplicitTarget is used to represent any remaining attribute values // when target percentages don't add up to 100 ImplicitTarget = "*" + + // evenSpreadBoost is a positive boost used when the job has + // even spread over an attribute. Any nodes whose attribute value is + // equal to the minimum count over all possible attributes gets boosted + evenSpreadBoost = 0.01 + + // evenSpreadPenality is a penalty used when the job has + // even spread over an attribute. Any nodes whose attribute value is + // greater than the minimum count over all possible attributes gets this penalty. + // This is to ensure that other nodes are preferred when one of the values + // has a larger number of allocations + evenSpreadPenalty = -0.5 ) // SpreadIterator is used to spread allocations across a specified attribute @@ -119,25 +131,33 @@ func (iter *SpreadIterator) Next() *RankedNode { } spreadAttributeMap := iter.tgSpreadInfo[tgName] spreadDetails := spreadAttributeMap[pset.targetAttribute] - // Get the desired count - desiredCount, ok := spreadDetails.desiredCounts[nValue] - if !ok { - // See if there is an implicit target - desiredCount, ok = spreadDetails.desiredCounts[ImplicitTarget] - if !ok { - // The desired count for this attribute is zero if it gets here - // don't boost the score - continue - } - } - if float64(usedCount) < desiredCount { - // Calculate the relative weight of this specific spread attribute - spreadWeight := float64(spreadDetails.weight) / float64(iter.sumSpreadWeights) - // Score Boost is proportional the difference between current and desired count - // It is multiplied with the spread weight to account for cases where the job has - // more than one spread attribute - scoreBoost := ((desiredCount - float64(usedCount)) / desiredCount) * spreadWeight + + if len(spreadDetails.desiredCounts) == 0 { + // When desired counts map is empty the user didn't specify any targets + // Treat this as a special case + scoreBoost := evenSpreadScoreBoost(pset, option.Node) totalSpreadScore += scoreBoost + } else { + // Get the desired count + desiredCount, ok := spreadDetails.desiredCounts[nValue] + if !ok { + // See if there is an implicit target + desiredCount, ok = spreadDetails.desiredCounts[ImplicitTarget] + if !ok { + // The desired count for this attribute is zero if it gets here + // don't boost the score + continue + } + } + if float64(usedCount) < desiredCount { + // Calculate the relative weight of this specific spread attribute + spreadWeight := float64(spreadDetails.weight) / float64(iter.sumSpreadWeights) + // Score Boost is proportional the difference between current and desired count + // It is multiplied with the spread weight to account for cases where the job has + // more than one spread attribute + scoreBoost := ((desiredCount - float64(usedCount)) / desiredCount) * spreadWeight + totalSpreadScore += scoreBoost + } } } @@ -149,6 +169,48 @@ func (iter *SpreadIterator) Next() *RankedNode { } } +// evenSpreadScoreBoost is a scoring helper that calculates the score +// for the option when even spread is desired (all attribute values get equal preference) +func evenSpreadScoreBoost(pset *propertySet, option *structs.Node) float64 { + combinedUseMap := pset.GetCombinedUseMap() + if len(combinedUseMap) == 0 { + // Nothing placed yet, so return 0 as the score + return 0.0 + } + // Get the nodes property value + nValue, ok := getProperty(option, pset.targetAttribute) + currentAttributeCount := uint64(0) + if ok { + currentAttributeCount = combinedUseMap[nValue] + } + minCount := uint64(0) + maxCount := uint64(0) + for _, value := range combinedUseMap { + if minCount == 0 || value < minCount { + minCount = value + } + if maxCount == 0 || value > maxCount { + maxCount = value + } + } + if currentAttributeCount < minCount { + // Small positive boost for attributes with min count + return evenSpreadBoost + } else if currentAttributeCount > minCount { + // Negative boost if attribute count is greater than minimum + // This is so that other nodes will get a preference over this one + return evenSpreadPenalty + } else { + // When min and max are same the current distribution is even + // so we penalize + if minCount == maxCount { + return evenSpreadPenalty + } else { + return evenSpreadBoost + } + } +} + // computeSpreadInfo computes and stores percentages and total values // from all spreads that apply to a specific task group func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) { @@ -167,7 +229,8 @@ func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) { si.desiredCounts[st.Value] = desiredCount sumDesiredCounts += desiredCount } - if sumDesiredCounts < float64(totalCount) { + // Account for remaining count only if there is any spread targets + if sumDesiredCounts > 0 && sumDesiredCounts < float64(totalCount) { remainingCount := float64(totalCount) - sumDesiredCounts si.desiredCounts[ImplicitTarget] = remainingCount } diff --git a/scheduler/spread_test.go b/scheduler/spread_test.go index e546a2e6d..e6457d56e 100644 --- a/scheduler/spread_test.go +++ b/scheduler/spread_test.go @@ -245,3 +245,189 @@ func TestSpreadIterator_MultipleAttributes(t *testing.T) { } } + +func TestSpreadIterator_EvenSpread(t *testing.T) { + state, ctx := testContext(t) + dcs := []string{"dc1", "dc2", "dc1", "dc2", "dc1", "dc2", "dc2", "dc1", "dc1", "dc1"} + var nodes []*RankedNode + + // Add these nodes to the state store + for i, dc := range dcs { + node := mock.Node() + node.Datacenter = dc + if err := state.UpsertNode(uint64(100+i), node); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + nodes = append(nodes, &RankedNode{Node: node}) + } + + static := NewStaticRankIterator(ctx, nodes) + job := mock.Job() + tg := job.TaskGroups[0] + job.TaskGroups[0].Count = 10 + + // Configure even spread across node.datacenter + spread := &structs.Spread{ + Weight: 100, + Attribute: "${node.datacenter}", + } + tg.Spreads = []*structs.Spread{spread} + spreadIter := NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + + scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter) + + out := collectRanked(scoreNorm) + + // Nothing placed so both dc nodes get 0 as the score + expectedScores := map[string]float64{ + "dc1": 0, + "dc2": 0, + } + for _, rn := range out { + require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + } + + // Update the plan to add allocs to nodes in dc1 + // After this step dc2 nodes should get boosted + ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + } + ctx.plan.NodeAllocation[nodes[2].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[2].Node.ID, + }, + } + + // Reset the scores + for _, node := range nodes { + node.Scores = nil + node.FinalScore = 0 + } + static = NewStaticRankIterator(ctx, nodes) + spreadIter = NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter) + out = collectRanked(scoreNorm) + + // Expect nodes in dc2 with existing allocs to get a boost + // dc1 nodes are penalized because they have allocs + expectedScores = map[string]float64{ + "dc1": -0.5, + "dc2": 0.01, + } + for _, rn := range out { + require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + } + + // Update the plan to add more allocs to nodes in dc2 + // After this step dc1 nodes should get boosted + ctx.plan.NodeAllocation[nodes[1].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[1].Node.ID, + }, + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[1].Node.ID, + }, + } + ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[3].Node.ID, + }, + } + + // Reset the scores + for _, node := range nodes { + node.Scores = nil + node.FinalScore = 0 + } + static = NewStaticRankIterator(ctx, nodes) + spreadIter = NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter) + out = collectRanked(scoreNorm) + + // Expect nodes in dc2 to be penalized because there are 3 allocs there now + // dc1 nodes are boosted because that has 2 allocs + expectedScores = map[string]float64{ + "dc1": 0.01, + "dc2": -0.5, + } + for _, rn := range out { + require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + } + + // Add another node in dc3 + node := mock.Node() + node.Datacenter = "dc3" + if err := state.UpsertNode(uint64(1111), node); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + nodes = append(nodes, &RankedNode{Node: node}) + + // Add another alloc to dc1, now its count matches dc2 + ctx.plan.NodeAllocation[nodes[4].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[4].Node.ID, + }, + } + + // Reset scores + for _, node := range nodes { + node.Scores = nil + node.FinalScore = 0 + } + static = NewStaticRankIterator(ctx, nodes) + spreadIter = NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter) + out = collectRanked(scoreNorm) + + // Expect dc1 and dc2 to be penalized because they have 3 allocs + // dc3 should get a boost because it has 0 allocs + expectedScores = map[string]float64{ + "dc1": -0.5, + "dc2": -0.5, + "dc3": 0.01, + } + for _, rn := range out { + require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + } + +}