From fd697272a75d05541823f4e3cfcf88066d4b445f Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Tue, 17 Jul 2018 17:25:38 -0500 Subject: [PATCH] Implement spread iterator that scores according to percentage of desired count in each target. Added this as a new step in the stack and some unit tests --- scheduler/generic_sched_test.go | 83 +++++++++++ scheduler/propertyset.go | 84 +++++++---- scheduler/spread.go | 155 ++++++++++++++++++++ scheduler/spread_test.go | 249 ++++++++++++++++++++++++++++++++ scheduler/stack.go | 10 +- 5 files changed, 547 insertions(+), 34 deletions(-) create mode 100644 scheduler/spread.go create mode 100644 scheduler/spread_test.go diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index d72241784..fd8d81ec9 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -602,6 +602,89 @@ func TestServiceSched_JobRegister_DistinctProperty_TaskGroup_Incr(t *testing.T) h.AssertEvalStatus(t, structs.EvalStatusComplete) } +// Test job registration with spread configured +func TestServiceSched_Spread(t *testing.T) { + h := NewHarness(t) + assert := assert.New(t) + + // Create a job that uses spread over data center + job := mock.Job() + job.Datacenters = []string{"dc1", "dc2"} + job.TaskGroups[0].Count = 10 + job.TaskGroups[0].Spreads = append(job.TaskGroups[0].Spreads, + &structs.Spread{ + Attribute: "${node.datacenter}", + Weight: 100, + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "dc1", + Percent: 70, + }, + { + Value: "dc2", + Percent: 30, + }, + }, + }) + assert.Nil(h.State.UpsertJob(h.NextIndex(), job), "UpsertJob") + + // Create some nodes, half in dc2 + var nodes []*structs.Node + nodeMap := make(map[string]*structs.Node) + for i := 0; i < 6; i++ { + node := mock.Node() + if i%2 == 0 { + node.Datacenter = "dc2" + } + nodes = append(nodes, node) + assert.Nil(h.State.UpsertNode(h.NextIndex(), node), "UpsertNode") + nodeMap[node.ID] = node + } + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + assert.Nil(h.Process(NewServiceScheduler, eval), "Process") + + // Ensure a single plan + assert.Len(h.Plans, 1, "Number of plans") + plan := h.Plans[0] + + // Ensure the plan doesn't have annotations. + assert.Nil(plan.Annotations, "Plan.Annotations") + + // Ensure the eval hasn't spawned blocked eval + assert.Len(h.CreateEvals, 0, "Created Evals") + + // Ensure the plan allocated + var planned []*structs.Allocation + dcAllocsMap := make(map[string]int) + for nodeId, allocList := range plan.NodeAllocation { + planned = append(planned, allocList...) + dc := nodeMap[nodeId].Datacenter + c := dcAllocsMap[dc] + c += len(allocList) + dcAllocsMap[dc] = c + } + assert.Len(planned, 10, "Planned Allocations") + + expectedCounts := make(map[string]int) + expectedCounts["dc1"] = 7 + expectedCounts["dc2"] = 3 + require.Equal(t, expectedCounts, dcAllocsMap) + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestServiceSched_JobRegister_Annotate(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go index c335ec340..8e664254e 100644 --- a/scheduler/propertyset.go +++ b/scheduler/propertyset.go @@ -23,8 +23,8 @@ type propertySet struct { // taskGroup is optionally set if the constraint is for a task group taskGroup string - // constraint is the constraint this property set is checking - constraint *structs.Constraint + // targetAttribute is the attribute this property set is checking + targetAttribute string // allowedCount is the allowed number of allocations that can have the // distinct property @@ -60,7 +60,7 @@ func NewPropertySet(ctx Context, job *structs.Job) *propertySet { return p } -// SetJobConstraint is used to parameterize the property set for a +// SetJobConstraintAttribute is used to parameterize the property set for a // distinct_property constraint set at the job level. func (p *propertySet) SetJobConstraint(constraint *structs.Constraint) { p.setConstraint(constraint, "") @@ -75,14 +75,7 @@ func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup // setConstraint is a shared helper for setting a job or task group constraint. func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup string) { - // Store that this is for a task group - if taskGroup != "" { - p.taskGroup = taskGroup - } - - // Store the constraint - p.constraint = constraint - + allowedCount := uint64(0) // Determine the number of allowed allocations with the property. if v := constraint.RTarget; v != "" { c, err := strconv.ParseUint(v, 10, 64) @@ -92,14 +85,35 @@ func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup st return } - p.allowedCount = c + allowedCount = c } else { - p.allowedCount = 1 + allowedCount = 1 } + p.setPropertySetInner(constraint.LTarget, allowedCount, taskGroup) +} + +// SetTargetAttribute is used to populate this property set without also storing allowed count +// This is used when evaluating spread stanzas +func (p *propertySet) SetTargetAttribute(targetAttribute string, taskGroup string) { + p.setPropertySetInner(targetAttribute, 0, taskGroup) +} + +// setConstraint is a shared helper for setting a job or task group attribute and allowedCount +// allowedCount can be zero when this is used in evaluating spread stanzas +func (p *propertySet) setPropertySetInner(targetAttribute string, allowedCount uint64, taskGroup string) { + // Store that this is for a task group + if taskGroup != "" { + p.taskGroup = taskGroup + } + + // Store the constraint + p.targetAttribute = targetAttribute + + p.allowedCount = allowedCount // Determine the number of existing allocations that are using a property // value - p.populateExisting(constraint) + p.populateExisting(targetAttribute) // Populate the proposed when setting the constraint. We do this because // when detecting if we can inplace update an allocation we stage an @@ -110,7 +124,7 @@ func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup st // populateExisting is a helper shared when setting the constraint to populate // the existing values. -func (p *propertySet) populateExisting(constraint *structs.Constraint) { +func (p *propertySet) populateExisting(targetAttribute string) { // Retrieve all previously placed allocations ws := memdb.NewWatchSet() allocs, err := p.ctx.State().AllocsByJob(ws, p.namespace, p.jobID, false) @@ -193,15 +207,32 @@ func (p *propertySet) PopulateProposed() { // placements. If the option does not satisfy the constraints an explanation is // given. func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg string) (bool, string) { + nValue, errorMsg, usedCount := p.UsedCount(option, tg) + if errorMsg != "" { + return false, errorMsg + } + // The property value has been used but within the number of allowed + // allocations. + if usedCount < p.allowedCount { + return true, "" + } + + return false, fmt.Sprintf("distinct_property: %s=%s used by %d allocs", p.targetAttribute, nValue, usedCount) +} + +// UsedCount returns the number of times the value of the attribute being tracked by this +// property set is used across current and proposed allocations. It also returns the resolved +// attribute value for the node, and an error message if it couldn't be resolved correctly +func (p *propertySet) UsedCount(option *structs.Node, tg string) (string, string, uint64) { // Check if there was an error building if p.errorBuilding != nil { - return false, p.errorBuilding.Error() + return "", p.errorBuilding.Error(), 0 } // Get the nodes property value - nValue, ok := getProperty(option, p.constraint.LTarget) + nValue, ok := getProperty(option, p.targetAttribute) if !ok { - return false, fmt.Sprintf("missing property %q", p.constraint.LTarget) + return nValue, fmt.Sprintf("missing property %q", p.targetAttribute), 0 } // combine the counts of how many times the property has been used by @@ -229,19 +260,8 @@ func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg strin } } - usedCount, used := combinedUse[nValue] - if !used { - // The property value has never been used so we can use it. - return true, "" - } - - // The property value has been used but within the number of allowed - // allocations. - if usedCount < p.allowedCount { - return true, "" - } - - return false, fmt.Sprintf("distinct_property: %s=%s used by %d allocs", p.constraint.LTarget, nValue, usedCount) + usedCount := combinedUse[nValue] + return nValue, "", usedCount } // filterAllocs filters a set of allocations to just be those that are running @@ -298,7 +318,7 @@ func (p *propertySet) populateProperties(allocs []*structs.Allocation, nodes map properties map[string]uint64) { for _, alloc := range allocs { - nProperty, ok := getProperty(nodes[alloc.NodeID], p.constraint.LTarget) + nProperty, ok := getProperty(nodes[alloc.NodeID], p.targetAttribute) if !ok { continue } diff --git a/scheduler/spread.go b/scheduler/spread.go new file mode 100644 index 000000000..db59b3322 --- /dev/null +++ b/scheduler/spread.go @@ -0,0 +1,155 @@ +package scheduler + +import ( + "github.com/hashicorp/nomad/nomad/structs" +) + +// SpreadIterator is used to spread allocations across a specified attribute +// according to preset weights +type SpreadIterator struct { + ctx Context + source RankIterator + job *structs.Job + tg *structs.TaskGroup + jobSpreads []*structs.Spread + tgSpreadInfo map[string]spreadAttributeMap + sumSpreadWeights int + hasSpread bool + groupPropertySets map[string][]*propertySet +} + +type spreadAttributeMap map[string]*spreadInfo + +type spreadInfo struct { + sumWeight uint32 + weight int + desiredCounts map[string]float64 +} + +func NewSpreadIterator(ctx Context, source RankIterator) *SpreadIterator { + iter := &SpreadIterator{ + ctx: ctx, + source: source, + groupPropertySets: make(map[string][]*propertySet), + tgSpreadInfo: make(map[string]spreadAttributeMap), + } + return iter +} + +func (iter *SpreadIterator) Reset() { + iter.source.Reset() + for _, sets := range iter.groupPropertySets { + for _, ps := range sets { + ps.PopulateProposed() + } + } +} + +func (iter *SpreadIterator) SetJob(job *structs.Job) { + iter.job = job + if job.Spreads != nil { + iter.jobSpreads = job.Spreads + } +} + +func (iter *SpreadIterator) SetTaskGroup(tg *structs.TaskGroup) { + iter.tg = tg + + // Build the property set at the taskgroup level + if _, ok := iter.groupPropertySets[tg.Name]; !ok { + // First add property sets that are at the job level for this task group + for _, spread := range iter.jobSpreads { + pset := NewPropertySet(iter.ctx, iter.job) + pset.SetTargetAttribute(spread.Attribute, tg.Name) + iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset) + } + + // Include property sets at the task group level + for _, spread := range tg.Spreads { + pset := NewPropertySet(iter.ctx, iter.job) + pset.SetTargetAttribute(spread.Attribute, tg.Name) + iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset) + } + } + + // Check if there is a distinct property + iter.hasSpread = len(iter.groupPropertySets[tg.Name]) != 0 + + // Build tgSpreadInfo at the task group level + if _, ok := iter.tgSpreadInfo[tg.Name]; !ok { + iter.computeSpreadInfo(tg) + } + +} + +func (iter *SpreadIterator) hasSpreads() bool { + return iter.hasSpread +} + +func (iter *SpreadIterator) Next() *RankedNode { + for { + option := iter.source.Next() + + // Hot path if there is nothing to check + if option == nil || !iter.hasSpreads() { + return option + } + + tgName := iter.tg.Name + propertySets := iter.groupPropertySets[tgName] + // Iterate over each spread attribute's property set and add a weighted score + totalSpreadScore := 0.0 + for _, pset := range propertySets { + nValue, errorMsg, usedCount := pset.UsedCount(option.Node, tgName) + if errorMsg != "" { + // Skip if there was errors in resolving this attribute to compute used counts + continue + } + spreadAttributeMap := iter.tgSpreadInfo[tgName] + spreadDetails := spreadAttributeMap[pset.targetAttribute] + // Get the desired count + desiredCount, ok := spreadDetails.desiredCounts[nValue] + if !ok { + // Warn about missing ratio + iter.ctx.Logger().Printf("[WARN] sched: missing desired distribution percentage for attribute value %v in spread stanza for job %v", nValue, iter.job.ID) + continue + } + if float64(usedCount) < desiredCount { + // Calculate the relative weight of this specific spread attribute + spreadWeight := float64(spreadDetails.weight) / float64(iter.sumSpreadWeights) + // Score Boost is proportional the difference between current and desired count + // It is multiplied with the spread weight to account for cases where the job has + // more than one spread attribute + scoreBoost := ((desiredCount - float64(usedCount)) / desiredCount) * spreadWeight + totalSpreadScore += scoreBoost + } + } + + if totalSpreadScore != 0.0 { + option.Scores = append(option.Scores, totalSpreadScore) + iter.ctx.Metrics().ScoreNode(option.Node, "allocation-spread", totalSpreadScore) + } + return option + } +} + +// computeSpreadInfo computes and stores percentages and total values +// from all spreads that apply to a specific task group +func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) { + spreadInfos := make(spreadAttributeMap, len(tg.Spreads)) + totalCount := tg.Count + for _, spread := range tg.Spreads { + sumWeight := uint32(0) + for _, st := range spread.SpreadTarget { + sumWeight += st.Percent + } + si := &spreadInfo{sumWeight: sumWeight, weight: spread.Weight, desiredCounts: make(map[string]float64)} + for _, st := range spread.SpreadTarget { + desiredCount := (float64(st.Percent) / float64(sumWeight)) * float64(totalCount) + si.desiredCounts[st.Value] = desiredCount + } + spreadInfos[spread.Attribute] = si + iter.sumSpreadWeights += spread.Weight + } + iter.tgSpreadInfo[tg.Name] = spreadInfos +} diff --git a/scheduler/spread_test.go b/scheduler/spread_test.go new file mode 100644 index 000000000..3ec6cc8c2 --- /dev/null +++ b/scheduler/spread_test.go @@ -0,0 +1,249 @@ +package scheduler + +import ( + "testing" + + "fmt" + + "github.com/hashicorp/nomad/helper/uuid" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/stretchr/testify/require" +) + +func TestSpreadIterator_SingleAttribute(t *testing.T) { + state, ctx := testContext(t) + dcs := []string{"dc1", "dc2", "dc1", "dc1"} + var nodes []*RankedNode + + // Add these nodes to the state store + for i, dc := range dcs { + node := mock.Node() + node.Datacenter = dc + if err := state.UpsertNode(uint64(100+i), node); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + nodes = append(nodes, &RankedNode{Node: node}) + } + + static := NewStaticRankIterator(ctx, nodes) + + job := mock.Job() + tg := job.TaskGroups[0] + job.TaskGroups[0].Count = 5 + // add allocs to nodes in dc1 + upserting := []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[2].Node.ID, + }, + } + + if err := state.UpsertAllocs(1000, upserting); err != nil { + t.Fatalf("failed to UpsertAllocs: %v", err) + } + + spread := &structs.Spread{ + Weight: 100, + Attribute: "${node.datacenter}", + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "dc1", + Percent: 80, + }, + { + Value: "dc2", + Percent: 20, + }, + }, + } + tg.Spreads = []*structs.Spread{spread} + spreadIter := NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + + scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter) + + out := collectRanked(scoreNorm) + + // Expect nodes in dc1 with existing allocs to get a boost + // Boost should be ((desiredCount-actual)/expected)*spreadWeight + // For this test, that becomes dc1 = ((4-2)/4 ) = 0.5, and dc2=(1-0)/1 + expectedScores := map[string]float64{ + "dc1": 0.5, + "dc2": 1.0, + } + for _, rn := range out { + require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + } + + // Update the plan to add more allocs to nodes in dc1 + // After this step there are enough allocs to meet the desired count in dc1 + ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + // Should be ignored as it is a different job. + { + Namespace: structs.DefaultNamespace, + TaskGroup: "bbb", + JobID: "ignore 2", + Job: job, + ID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + } + ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + NodeID: nodes[3].Node.ID, + }, + } + + // Reset the scores + for _, node := range nodes { + node.Scores = nil + node.FinalScore = 0 + } + static = NewStaticRankIterator(ctx, nodes) + spreadIter = NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter) + out = collectRanked(scoreNorm) + + // Expect nodes in dc2 with existing allocs to get a boost + // DC1 nodes are not boosted because there are enough allocs to meet + // the desired count + expectedScores = map[string]float64{ + "dc1": 0, + "dc2": 1.0, + } + for _, rn := range out { + require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore) + } +} + +func TestSpreadIterator_MultipleAttributes(t *testing.T) { + state, ctx := testContext(t) + dcs := []string{"dc1", "dc2", "dc1", "dc1"} + rack := []string{"r1", "r1", "r2", "r2"} + var nodes []*RankedNode + + // Add these nodes to the state store + for i, dc := range dcs { + node := mock.Node() + node.Datacenter = dc + node.Meta["rack"] = rack[i] + if err := state.UpsertNode(uint64(100+i), node); err != nil { + t.Fatalf("failed to upsert node: %v", err) + } + nodes = append(nodes, &RankedNode{Node: node}) + } + + static := NewStaticRankIterator(ctx, nodes) + + job := mock.Job() + tg := job.TaskGroups[0] + job.TaskGroups[0].Count = 5 + // add allocs to nodes in dc1 + upserting := []*structs.Allocation{ + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + }, + { + Namespace: structs.DefaultNamespace, + TaskGroup: tg.Name, + JobID: job.ID, + Job: job, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[2].Node.ID, + }, + } + + if err := state.UpsertAllocs(1000, upserting); err != nil { + t.Fatalf("failed to UpsertAllocs: %v", err) + } + + spread1 := &structs.Spread{ + Weight: 100, + Attribute: "${node.datacenter}", + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "dc1", + Percent: 60, + }, + { + Value: "dc2", + Percent: 40, + }, + }, + } + + spread2 := &structs.Spread{ + Weight: 50, + Attribute: "${meta.rack}", + SpreadTarget: []*structs.SpreadTarget{ + { + Value: "r1", + Percent: 40, + }, + { + Value: "r2", + Percent: 60, + }, + }, + } + + tg.Spreads = []*structs.Spread{spread1, spread2} + spreadIter := NewSpreadIterator(ctx, static) + spreadIter.SetJob(job) + spreadIter.SetTaskGroup(tg) + + scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter) + + out := collectRanked(scoreNorm) + + // Score come from combining two different spread factors + // Second node should have the highest score because it has no allocs and its in dc2/r1 + expectedScores := map[string]float64{ + nodes[0].Node.ID: 0.389, + nodes[1].Node.ID: 0.833, + nodes[2].Node.ID: 0.444, + nodes[3].Node.ID: 0.444, + } + for _, rn := range out { + require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.ID]), fmt.Sprintf("%.3f", rn.FinalScore)) + } + +} diff --git a/scheduler/stack.go b/scheduler/stack.go index 15b8538e5..46191c41d 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -57,6 +57,7 @@ type GenericStack struct { limit *LimitIterator maxScore *MaxScoreIterator nodeAffinity *NodeAffinityIterator + spread *SpreadIterator scoreNorm *ScoreNormalizationIterator } @@ -117,7 +118,9 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty) - s.scoreNorm = NewScoreNormalizationIterator(ctx, s.nodeAffinity) + s.spread = NewSpreadIterator(ctx, s.nodeAffinity) + + s.scoreNorm = NewScoreNormalizationIterator(ctx, s.spread) // Apply a limit function. This is to avoid scanning *every* possible node. s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip) @@ -156,6 +159,7 @@ func (s *GenericStack) SetJob(job *structs.Job) { s.binPack.SetPriority(job.Priority) s.jobAntiAff.SetJob(job) s.nodeAffinity.SetJob(job) + s.spread.SetJob(job) s.ctx.Eligibility().SetJob(job) if contextual, ok := s.quota.(ContextualIterator); ok { @@ -200,7 +204,9 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*R s.nodeReschedulingPenalty.SetPenaltyNodes(options.PenaltyNodeIDs) } s.nodeAffinity.SetTaskGroup(tg) - if s.nodeAffinity.hasAffinities() { + s.spread.SetTaskGroup(tg) + + if s.nodeAffinity.hasAffinities() || s.spread.hasSpreads() { s.limit.SetLimit(math.MaxInt32) }