diff --git a/scheduler/context.go b/scheduler/context.go index f0ae1e64e..569a1347f 100644 --- a/scheduler/context.go +++ b/scheduler/context.go @@ -34,8 +34,8 @@ type Context interface { // RegexpCache is a cache of regular expressions RegexpCache() map[string]*regexp.Regexp - // ConstraintCache is a cache of version constraints - ConstraintCache() map[string]version.Constraints + // VersionConstraintCache is a cache of version constraints + VersionConstraintCache() map[string]version.Constraints // Eligibility returns a tracker for node eligibility in the context of the // eval. @@ -54,7 +54,8 @@ func (e *EvalCache) RegexpCache() map[string]*regexp.Regexp { } return e.reCache } -func (e *EvalCache) ConstraintCache() map[string]version.Constraints { + +func (e *EvalCache) VersionConstraintCache() map[string]version.Constraints { if e.constraintCache == nil { e.constraintCache = make(map[string]version.Constraints) } diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 4b1a99571..9fa42d592 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -403,11 +403,11 @@ func (c *ConstraintChecker) Feasible(option *structs.Node) bool { func (c *ConstraintChecker) meetsConstraint(constraint *structs.Constraint, option *structs.Node) bool { // Resolve the targets - lVal, ok := resolveConstraintTarget(constraint.LTarget, option) + lVal, ok := resolveTarget(constraint.LTarget, option) if !ok { return false } - rVal, ok := resolveConstraintTarget(constraint.RTarget, option) + rVal, ok := resolveTarget(constraint.RTarget, option) if !ok { return false } @@ -416,8 +416,8 @@ func (c *ConstraintChecker) meetsConstraint(constraint *structs.Constraint, opti return checkConstraint(c.ctx, constraint.Operand, lVal, rVal) } -// resolveConstraintTarget is used to resolve the LTarget and RTarget of a Constraint -func resolveConstraintTarget(target string, node *structs.Node) (interface{}, bool) { +// resolveTarget is used to resolve the LTarget and RTarget of a Constraint +func resolveTarget(target string, node *structs.Node) (interface{}, bool) { // If no prefix, this must be a literal value if !strings.HasPrefix(target, "${") { return target, true @@ -470,16 +470,28 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool { case "<", "<=", ">", ">=": return checkLexicalOrder(operand, lVal, rVal) case structs.ConstraintVersion: - return checkVersionConstraint(ctx, lVal, rVal) + return checkVersionMatch(ctx, lVal, rVal) case structs.ConstraintRegex: - return checkRegexpConstraint(ctx, lVal, rVal) + return checkRegexpMatch(ctx, lVal, rVal) case structs.ConstraintSetContains: - return checkSetContainsConstraint(ctx, lVal, rVal) + return checkSetContainsAll(ctx, lVal, rVal) default: return false } } +// checkAffinity checks if a specific affinity is satisfied +func checkAffinity(ctx Context, operand string, lVal, rVal interface{}) bool { + switch operand { + case structs.AffinitySetContainsAny: + return checkSetContainsAny(ctx, lVal, rVal) + case structs.AffinitySetContainsAll: + return checkSetContainsAll(ctx, lVal, rVal) + default: + return checkConstraint(ctx, operand, lVal, rVal) + } +} + // checkLexicalOrder is used to check for lexical ordering func checkLexicalOrder(op string, lVal, rVal interface{}) bool { // Ensure the values are strings @@ -506,9 +518,9 @@ func checkLexicalOrder(op string, lVal, rVal interface{}) bool { } } -// checkVersionConstraint is used to compare a version on the +// checkVersionMatch is used to compare a version on the // left hand side with a set of constraints on the right hand side -func checkVersionConstraint(ctx Context, lVal, rVal interface{}) bool { +func checkVersionMatch(ctx Context, lVal, rVal interface{}) bool { // Parse the version var versionStr string switch v := lVal.(type) { @@ -533,7 +545,7 @@ func checkVersionConstraint(ctx Context, lVal, rVal interface{}) bool { } // Check the cache for a match - cache := ctx.ConstraintCache() + cache := ctx.VersionConstraintCache() constraints := cache[constraintStr] // Parse the constraints @@ -549,9 +561,9 @@ func checkVersionConstraint(ctx Context, lVal, rVal interface{}) bool { return constraints.Check(vers) } -// checkRegexpConstraint is used to compare a value on the +// checkRegexpMatch is used to compare a value on the // left hand side with a regexp on the right hand side -func checkRegexpConstraint(ctx Context, lVal, rVal interface{}) bool { +func checkRegexpMatch(ctx Context, lVal, rVal interface{}) bool { // Ensure left-hand is string lStr, ok := lVal.(string) if !ok { @@ -582,9 +594,9 @@ func checkRegexpConstraint(ctx Context, lVal, rVal interface{}) bool { return re.MatchString(lStr) } -// checkSetContainsConstraint is used to see if the left hand side contains the +// checkSetContainsAll is used to see if the left hand side contains the // string on the right hand side -func checkSetContainsConstraint(ctx Context, lVal, rVal interface{}) bool { +func checkSetContainsAll(ctx Context, lVal, rVal interface{}) bool { // Ensure left-hand is string lStr, ok := lVal.(string) if !ok { @@ -614,6 +626,38 @@ func checkSetContainsConstraint(ctx Context, lVal, rVal interface{}) bool { return true } +// checkSetContainsAny is used to see if the left hand side contains any +// values on the right hand side +func checkSetContainsAny(ctx Context, lVal, rVal interface{}) bool { + // Ensure left-hand is string + lStr, ok := lVal.(string) + if !ok { + return false + } + + // RHS must be a string + rStr, ok := rVal.(string) + if !ok { + return false + } + + input := strings.Split(lStr, ",") + lookup := make(map[string]struct{}, len(input)) + for _, in := range input { + cleaned := strings.TrimSpace(in) + lookup[cleaned] = struct{}{} + } + + for _, r := range strings.Split(rStr, ",") { + cleaned := strings.TrimSpace(r) + if _, ok := lookup[cleaned]; ok { + return true + } + } + + return false +} + // FeasibilityWrapper is a FeasibleIterator which wraps both job and task group // FeasibilityCheckers in which feasibility checking can be skipped if the // computed node class has previously been marked as eligible or ineligible. diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 7b2129e1c..7917077f7 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -309,7 +309,7 @@ func TestResolveConstraintTarget(t *testing.T) { } for _, tc := range cases { - res, ok := resolveConstraintTarget(tc.target, tc.node) + res, ok := resolveTarget(tc.target, tc.node) if ok != tc.result { t.Fatalf("TC: %#v, Result: %v %v", tc, res, ok) } @@ -460,7 +460,7 @@ func TestCheckVersionConstraint(t *testing.T) { } for _, tc := range cases { _, ctx := testContext(t) - if res := checkVersionConstraint(ctx, tc.lVal, tc.rVal); res != tc.result { + if res := checkVersionMatch(ctx, tc.lVal, tc.rVal); res != tc.result { t.Fatalf("TC: %#v, Result: %v", tc, res) } } @@ -495,7 +495,7 @@ func TestCheckRegexpConstraint(t *testing.T) { } for _, tc := range cases { _, ctx := testContext(t) - if res := checkRegexpConstraint(ctx, tc.lVal, tc.rVal); res != tc.result { + if res := checkRegexpMatch(ctx, tc.lVal, tc.rVal); res != tc.result { t.Fatalf("TC: %#v, Result: %v", tc, res) } } diff --git a/scheduler/propertyset.go b/scheduler/propertyset.go index cacea9ae6..2edb519fe 100644 --- a/scheduler/propertyset.go +++ b/scheduler/propertyset.go @@ -121,10 +121,10 @@ func (p *propertySet) populateExisting(constraint *structs.Constraint) { } // Filter to the correct set of allocs - allocs = p.filterAllocs(allocs, true) + allocs = filterAllocs(allocs, true, p.taskGroup) // Get all the nodes that have been used by the allocs - nodes, err := p.buildNodeMap(allocs) + nodes, err := buildNodeMap(p.ctx.State(), allocs) if err != nil { p.errorBuilding = err p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err) @@ -132,7 +132,7 @@ func (p *propertySet) populateExisting(constraint *structs.Constraint) { } // Build existing properties map - p.populateProperties(allocs, nodes, p.existingValues) + populateProperties(p.constraint.LTarget, allocs, nodes, p.existingValues) } // PopulateProposed populates the proposed values and recomputes any cleared @@ -149,20 +149,20 @@ func (p *propertySet) PopulateProposed() { for _, updates := range p.ctx.Plan().NodeUpdate { stopping = append(stopping, updates...) } - stopping = p.filterAllocs(stopping, false) + stopping = filterAllocs(stopping, false, p.taskGroup) // Gather the proposed allocations var proposed []*structs.Allocation for _, pallocs := range p.ctx.Plan().NodeAllocation { proposed = append(proposed, pallocs...) } - proposed = p.filterAllocs(proposed, true) + proposed = filterAllocs(proposed, true, p.taskGroup) // Get the used nodes both := make([]*structs.Allocation, 0, len(stopping)+len(proposed)) both = append(both, stopping...) both = append(both, proposed...) - nodes, err := p.buildNodeMap(both) + nodes, err := buildNodeMap(p.ctx.State(), both) if err != nil { p.errorBuilding = err p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err) @@ -170,10 +170,10 @@ func (p *propertySet) PopulateProposed() { } // Populate the cleared values - p.populateProperties(stopping, nodes, p.clearedValues) + populateProperties(p.constraint.LTarget, stopping, nodes, p.clearedValues) // Populate the proposed values - p.populateProperties(proposed, nodes, p.proposedValues) + populateProperties(p.constraint.LTarget, proposed, nodes, p.proposedValues) // Remove any cleared value that is now being used by the proposed allocs for value := range p.proposedValues { @@ -247,7 +247,7 @@ func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg strin // filterAllocs filters a set of allocations to just be those that are running // and if the property set is operation at a task group level, for allocations // for that task group -func (p *propertySet) filterAllocs(allocs []*structs.Allocation, filterTerminal bool) []*structs.Allocation { +func filterAllocs(allocs []*structs.Allocation, filterTerminal bool, taskGroup string) []*structs.Allocation { n := len(allocs) for i := 0; i < n; i++ { remove := false @@ -257,8 +257,8 @@ func (p *propertySet) filterAllocs(allocs []*structs.Allocation, filterTerminal // If the constraint is on the task group filter the allocations to just // those on the task group - if p.taskGroup != "" { - remove = remove || allocs[i].TaskGroup != p.taskGroup + if taskGroup != "" { + remove = remove || allocs[i].TaskGroup != taskGroup } if remove { @@ -272,7 +272,7 @@ func (p *propertySet) filterAllocs(allocs []*structs.Allocation, filterTerminal // buildNodeMap takes a list of allocations and returns a map of the nodes used // by those allocations -func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*structs.Node, error) { +func buildNodeMap(state State, allocs []*structs.Allocation) (map[string]*structs.Node, error) { // Get all the nodes that have been used by the allocs nodes := make(map[string]*structs.Node) ws := memdb.NewWatchSet() @@ -281,7 +281,7 @@ func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*st continue } - node, err := p.ctx.State().NodeByID(ws, alloc.NodeID) + node, err := state.NodeByID(ws, alloc.NodeID) if err != nil { return nil, fmt.Errorf("failed to lookup node ID %q: %v", alloc.NodeID, err) } @@ -294,11 +294,11 @@ func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*st // populateProperties goes through all allocations and builds up the used // properties from the nodes storing the results in the passed properties map. -func (p *propertySet) populateProperties(allocs []*structs.Allocation, nodes map[string]*structs.Node, +func populateProperties(lTarget string, allocs []*structs.Allocation, nodes map[string]*structs.Node, properties map[string]uint64) { for _, alloc := range allocs { - nProperty, ok := getProperty(nodes[alloc.NodeID], p.constraint.LTarget) + nProperty, ok := getProperty(nodes[alloc.NodeID], lTarget) if !ok { continue } @@ -313,7 +313,7 @@ func getProperty(n *structs.Node, property string) (string, bool) { return "", false } - val, ok := resolveConstraintTarget(property, n) + val, ok := resolveTarget(property, n) if !ok { return "", false } diff --git a/scheduler/rank.go b/scheduler/rank.go index 0a49bb49d..8038338ce 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -3,15 +3,24 @@ package scheduler import ( "fmt" + "math" + "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // binPackingMaxFitScore is the maximum possible bin packing fitness score. + // This is used to normalize bin packing score to a value between 0 and 1 + binPackingMaxFitScore = 18.0 +) + // Rank is used to provide a score and various ranking metadata // along with a node when iterating. This state can be modified as // various rank methods are applied. type RankedNode struct { Node *structs.Node - Score float64 + FinalScore float64 + Scores []float64 TaskResources map[string]*structs.Resources // Allocs is used to cache the proposed allocations on the @@ -20,7 +29,7 @@ type RankedNode struct { } func (r *RankedNode) GoString() string { - return fmt.Sprintf("", r.Node.ID, r.Score) + return fmt.Sprintf("", r.Node.ID, r.FinalScore) } func (r *RankedNode) ProposedAllocs(ctx Context) ([]*structs.Allocation, error) { @@ -231,8 +240,9 @@ OUTER: // Score the fit normally otherwise fitness := structs.ScoreFit(option.Node, util) - option.Score += fitness - iter.ctx.Metrics().ScoreNode(option.Node, "binpack", fitness) + normalizedFit := float64(fitness) / float64(binPackingMaxFitScore) + option.Scores = append(option.Scores, normalizedFit) + iter.ctx.Metrics().ScoreNode(option.Node, "binpack", normalizedFit) return option } } @@ -245,26 +255,31 @@ func (iter *BinPackIterator) Reset() { // along side other allocations from this job. This is used to help distribute // load across the cluster. type JobAntiAffinityIterator struct { - ctx Context - source RankIterator - penalty float64 - jobID string + ctx Context + source RankIterator + jobID string + taskGroup string + desiredCount int } // NewJobAntiAffinityIterator is used to create a JobAntiAffinityIterator that // applies the given penalty for co-placement with allocs from this job. -func NewJobAntiAffinityIterator(ctx Context, source RankIterator, penalty float64, jobID string) *JobAntiAffinityIterator { +func NewJobAntiAffinityIterator(ctx Context, source RankIterator, jobID string) *JobAntiAffinityIterator { iter := &JobAntiAffinityIterator{ - ctx: ctx, - source: source, - penalty: penalty, - jobID: jobID, + ctx: ctx, + source: source, + jobID: jobID, } return iter } -func (iter *JobAntiAffinityIterator) SetJob(jobID string) { - iter.jobID = jobID +func (iter *JobAntiAffinityIterator) SetJob(job *structs.Job) { + iter.jobID = job.ID +} + +func (iter *JobAntiAffinityIterator) SetTaskGroup(tg *structs.TaskGroup) { + iter.taskGroup = tg.Name + iter.desiredCount = tg.Count } func (iter *JobAntiAffinityIterator) Next() *RankedNode { @@ -286,15 +301,16 @@ func (iter *JobAntiAffinityIterator) Next() *RankedNode { // Determine the number of collisions collisions := 0 for _, alloc := range proposed { - if alloc.JobID == iter.jobID { + if alloc.JobID == iter.jobID && alloc.TaskGroup == iter.taskGroup { collisions += 1 } } - // Apply a penalty if there are collisions + // Calculate the penalty based on number of collisions + // TODO(preetha): Figure out if batch jobs need a different scoring penalty where collisions matter less if collisions > 0 { - scorePenalty := -1 * float64(collisions) * iter.penalty - option.Score += scorePenalty + scorePenalty := -1 * float64(collisions) / float64(iter.desiredCount) + option.Scores = append(option.Scores, scorePenalty) iter.ctx.Metrics().ScoreNode(option.Node, "job-anti-affinity", scorePenalty) } return option @@ -305,32 +321,30 @@ func (iter *JobAntiAffinityIterator) Reset() { iter.source.Reset() } -// NodeAntiAffinityIterator is used to apply a penalty to +// NodeReschedulingPenaltyIterator is used to apply a penalty to // a node that had a previous failed allocation for the same job. // This is used when attempting to reschedule a failed alloc -type NodeAntiAffinityIterator struct { +type NodeReschedulingPenaltyIterator struct { ctx Context source RankIterator - penalty float64 penaltyNodes map[string]struct{} } -// NewNodeAntiAffinityIterator is used to create a NodeAntiAffinityIterator that -// applies the given penalty for placement onto nodes in penaltyNodes -func NewNodeAntiAffinityIterator(ctx Context, source RankIterator, penalty float64) *NodeAntiAffinityIterator { - iter := &NodeAntiAffinityIterator{ - ctx: ctx, - source: source, - penalty: penalty, +// NewNodeReschedulingPenaltyIterator is used to create a NodeReschedulingPenaltyIterator that +// applies the given scoring penalty for placement onto nodes in penaltyNodes +func NewNodeReschedulingPenaltyIterator(ctx Context, source RankIterator) *NodeReschedulingPenaltyIterator { + iter := &NodeReschedulingPenaltyIterator{ + ctx: ctx, + source: source, } return iter } -func (iter *NodeAntiAffinityIterator) SetPenaltyNodes(penaltyNodes map[string]struct{}) { +func (iter *NodeReschedulingPenaltyIterator) SetPenaltyNodes(penaltyNodes map[string]struct{}) { iter.penaltyNodes = penaltyNodes } -func (iter *NodeAntiAffinityIterator) Next() *RankedNode { +func (iter *NodeReschedulingPenaltyIterator) Next() *RankedNode { for { option := iter.source.Next() if option == nil { @@ -339,14 +353,137 @@ func (iter *NodeAntiAffinityIterator) Next() *RankedNode { _, ok := iter.penaltyNodes[option.Node.ID] if ok { - option.Score -= iter.penalty - iter.ctx.Metrics().ScoreNode(option.Node, "node-anti-affinity", iter.penalty) + option.Scores = append(option.Scores, -1) + iter.ctx.Metrics().ScoreNode(option.Node, "node-reschedule-penalty", -1) } return option } } -func (iter *NodeAntiAffinityIterator) Reset() { +func (iter *NodeReschedulingPenaltyIterator) Reset() { iter.penaltyNodes = make(map[string]struct{}) iter.source.Reset() } + +type NodeAffinityIterator struct { + ctx Context + source RankIterator + jobAffinities []*structs.Affinity + affinities []*structs.Affinity +} + +func NewNodeAffinityIterator(ctx Context, source RankIterator) *NodeAffinityIterator { + return &NodeAffinityIterator{ + ctx: ctx, + source: source, + } +} + +func (iter *NodeAffinityIterator) SetJob(job *structs.Job) { + if job.Affinities != nil { + iter.jobAffinities = job.Affinities + } +} + +func (iter *NodeAffinityIterator) SetTaskGroup(tg *structs.TaskGroup) { + // Merge job affinities + if iter.jobAffinities != nil { + iter.affinities = append(iter.affinities, iter.jobAffinities...) + } + + // Merge task group affinities and task affinities + if tg.Affinities != nil { + iter.affinities = append(iter.affinities, tg.Affinities...) + } + for _, task := range tg.Tasks { + if task.Affinities != nil { + iter.affinities = append(iter.affinities, task.Affinities...) + } + } +} + +func (iter *NodeAffinityIterator) Reset() { + iter.source.Reset() + // This method is called between each task group, so only reset the merged list + iter.affinities = nil +} + +func (iter *NodeAffinityIterator) hasAffinities() bool { + return len(iter.affinities) > 0 +} + +func (iter *NodeAffinityIterator) Next() *RankedNode { + option := iter.source.Next() + if option == nil { + return nil + } + if len(iter.affinities) == 0 { + return option + } + // TODO(preetha): we should calculate normalized weights once and reuse it here + sumWeight := 0.0 + for _, affinity := range iter.affinities { + sumWeight += math.Abs(affinity.Weight) + } + + totalAffinityScore := 0.0 + for _, affinity := range iter.affinities { + if matchesAffinity(iter.ctx, affinity, option.Node) { + normScore := affinity.Weight / sumWeight + totalAffinityScore += normScore + } + } + if totalAffinityScore != 0.0 { + option.Scores = append(option.Scores, totalAffinityScore) + iter.ctx.Metrics().ScoreNode(option.Node, "node-affinity", totalAffinityScore) + } + return option +} + +func matchesAffinity(ctx Context, affinity *structs.Affinity, option *structs.Node) bool { + // Resolve the targets + lVal, ok := resolveTarget(affinity.LTarget, option) + if !ok { + return false + } + rVal, ok := resolveTarget(affinity.RTarget, option) + if !ok { + return false + } + + // Check if satisfied + return checkAffinity(ctx, affinity.Operand, lVal, rVal) +} + +type ScoreNormalizationIterator struct { + ctx Context + source RankIterator +} + +func NewScoreNormalizationIterator(ctx Context, source RankIterator) *ScoreNormalizationIterator { + return &ScoreNormalizationIterator{ + ctx: ctx, + source: source} +} + +func (iter *ScoreNormalizationIterator) Reset() { + iter.source.Reset() +} + +func (iter *ScoreNormalizationIterator) Next() *RankedNode { + option := iter.source.Next() + if option == nil { + return nil + } + numScorers := len(option.Scores) + if numScorers > 0 { + sum := 0.0 + for _, score := range option.Scores { + sum += score + } + option.FinalScore = sum / float64(numScorers) + } + //TODO(preetha): Turn map in allocmetrics into a heap of topK scores + iter.ctx.Metrics().ScoreNode(option.Node, "normalized-score", option.FinalScore) + return option +} diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 6828db58c..23cd408ab 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -85,7 +85,9 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) { binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) - out := collectRanked(binp) + scoreNorm := NewScoreNormalizationIterator(ctx, binp) + + out := collectRanked(scoreNorm) if len(out) != 2 { t.Fatalf("Bad: %v", out) } @@ -93,11 +95,11 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) { t.Fatalf("Bad: %v", out) } - if out[0].Score != 18 { - t.Fatalf("Bad: %v", out[0]) + if out[0].FinalScore != 1.0 { + t.Fatalf("Bad Score: %v", out[0].FinalScore) } - if out[1].Score < 10 || out[1].Score > 16 { - t.Fatalf("Bad: %v", out[1]) + if out[1].FinalScore < 0.75 || out[1].FinalScore > 0.95 { + t.Fatalf("Bad Score: %v", out[1].FinalScore) } } @@ -164,16 +166,18 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) { binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) - out := collectRanked(binp) + scoreNorm := NewScoreNormalizationIterator(ctx, binp) + + out := collectRanked(scoreNorm) if len(out) != 1 { t.Fatalf("Bad: %#v", out) } if out[0] != nodes[1] { - t.Fatalf("Bad: %v", out) + t.Fatalf("Bad Score: %v", out) } - if out[0].Score != 18 { - t.Fatalf("Bad: %v", out[0]) + if out[0].FinalScore != 1.0 { + t.Fatalf("Bad Score: %v", out[0].FinalScore) } } @@ -254,15 +258,17 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) { binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) - out := collectRanked(binp) + scoreNorm := NewScoreNormalizationIterator(ctx, binp) + + out := collectRanked(scoreNorm) if len(out) != 1 { t.Fatalf("Bad: %#v", out) } if out[0] != nodes[1] { t.Fatalf("Bad: %v", out) } - if out[0].Score != 18 { - t.Fatalf("Bad: %v", out[0]) + if out[0].FinalScore != 1.0 { + t.Fatalf("Bad Score: %v", out[0].FinalScore) } } @@ -348,18 +354,20 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) { binp := NewBinPackIterator(ctx, static, false, 0) binp.SetTaskGroup(taskGroup) - out := collectRanked(binp) + scoreNorm := NewScoreNormalizationIterator(ctx, binp) + + out := collectRanked(scoreNorm) if len(out) != 2 { t.Fatalf("Bad: %#v", out) } if out[0] != nodes[0] || out[1] != nodes[1] { t.Fatalf("Bad: %v", out) } - if out[0].Score < 10 || out[0].Score > 16 { - t.Fatalf("Bad: %v", out[0]) + if out[0].FinalScore < 0.50 || out[0].FinalScore > 0.95 { + t.Fatalf("Bad Score: %v", out[0].FinalScore) } - if out[1].Score != 18 { - t.Fatalf("Bad: %v", out[1]) + if out[1].FinalScore != 1 { + t.Fatalf("Bad Score: %v", out[1].FinalScore) } } @@ -379,16 +387,23 @@ func TestJobAntiAffinity_PlannedAlloc(t *testing.T) { } static := NewStaticRankIterator(ctx, nodes) + job := mock.Job() + job.ID = "foo" + tg := job.TaskGroups[0] + tg.Count = 4 + // Add a planned alloc to node1 that fills it plan := ctx.Plan() plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{ { - ID: uuid.Generate(), - JobID: "foo", + ID: uuid.Generate(), + JobID: "foo", + TaskGroup: tg.Name, }, { - ID: uuid.Generate(), - JobID: "foo", + ID: uuid.Generate(), + JobID: "foo", + TaskGroup: tg.Name, }, } @@ -399,24 +414,29 @@ func TestJobAntiAffinity_PlannedAlloc(t *testing.T) { }, } - binp := NewJobAntiAffinityIterator(ctx, static, 5.0, "foo") + jobAntiAff := NewJobAntiAffinityIterator(ctx, static, "foo") + jobAntiAff.SetJob(job) + jobAntiAff.SetTaskGroup(tg) - out := collectRanked(binp) + scoreNorm := NewScoreNormalizationIterator(ctx, jobAntiAff) + + out := collectRanked(scoreNorm) if len(out) != 2 { t.Fatalf("Bad: %#v", out) } if out[0] != nodes[0] { t.Fatalf("Bad: %v", out) } - if out[0].Score != -10.0 { - t.Fatalf("Bad: %#v", out[0]) + // Score should be -(#collissions/desired_count) => -(2/4) + if out[0].FinalScore != -0.5 { + t.Fatalf("Bad Score: %#v", out[0].FinalScore) } if out[1] != nodes[1] { t.Fatalf("Bad: %v", out) } - if out[1].Score != 0.0 { - t.Fatalf("Bad: %v", out[1]) + if out[1].FinalScore != 0.0 { + t.Fatalf("Bad Score: %v", out[1].FinalScore) } } @@ -450,17 +470,168 @@ func TestNodeAntiAffinity_PenaltyNodes(t *testing.T) { } static := NewStaticRankIterator(ctx, nodes) - nodeAntiAffIter := NewNodeAntiAffinityIterator(ctx, static, 50.0) + nodeAntiAffIter := NewNodeReschedulingPenaltyIterator(ctx, static) nodeAntiAffIter.SetPenaltyNodes(map[string]struct{}{node1.ID: {}}) - out := collectRanked(nodeAntiAffIter) + scoreNorm := NewScoreNormalizationIterator(ctx, nodeAntiAffIter) + + out := collectRanked(scoreNorm) require := require.New(t) require.Equal(2, len(out)) require.Equal(node1.ID, out[0].Node.ID) - require.Equal(-50.0, out[0].Score) + require.Equal(-1.0, out[0].FinalScore) require.Equal(node2.ID, out[1].Node.ID) - require.Equal(0.0, out[1].Score) + require.Equal(0.0, out[1].FinalScore) + +} + +func TestScoreNormalizationIterator(t *testing.T) { + // Test normalized scores when there is more than one scorer + _, ctx := testContext(t) + nodes := []*RankedNode{ + { + Node: &structs.Node{ + ID: uuid.Generate(), + }, + }, + { + Node: &structs.Node{ + ID: uuid.Generate(), + }, + }, + } + static := NewStaticRankIterator(ctx, nodes) + + job := mock.Job() + job.ID = "foo" + tg := job.TaskGroups[0] + tg.Count = 4 + + // Add a planned alloc to node1 that fills it + plan := ctx.Plan() + plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{ + { + ID: uuid.Generate(), + JobID: "foo", + TaskGroup: tg.Name, + }, + { + ID: uuid.Generate(), + JobID: "foo", + TaskGroup: tg.Name, + }, + } + + // Add a planned alloc to node2 that half fills it + plan.NodeAllocation[nodes[1].Node.ID] = []*structs.Allocation{ + { + JobID: "bar", + }, + } + + jobAntiAff := NewJobAntiAffinityIterator(ctx, static, "foo") + jobAntiAff.SetJob(job) + jobAntiAff.SetTaskGroup(tg) + + nodeReschedulePenaltyIter := NewNodeReschedulingPenaltyIterator(ctx, jobAntiAff) + nodeReschedulePenaltyIter.SetPenaltyNodes(map[string]struct{}{nodes[0].Node.ID: {}}) + + scoreNorm := NewScoreNormalizationIterator(ctx, nodeReschedulePenaltyIter) + + out := collectRanked(scoreNorm) + if len(out) != 2 { + t.Fatalf("Bad: %#v", out) + } + if out[0] != nodes[0] { + t.Fatalf("Bad: %v", out) + } + // Score should be averaged between both scorers + // -0.5 from job anti affinity and -1 from node rescheduling penalty + if out[0].FinalScore != -0.75 { + t.Fatalf("Bad Score: %#v", out[0].FinalScore) + } + + if out[1] != nodes[1] { + t.Fatalf("Bad Node: %v", out) + } + if out[1].FinalScore != 0.0 { + t.Fatalf("Bad Score: %v", out[1].FinalScore) + } +} + +func TestNodeAffinityIterator(t *testing.T) { + _, ctx := testContext(t) + nodes := []*RankedNode{ + {Node: mock.Node()}, + {Node: mock.Node()}, + {Node: mock.Node()}, + {Node: mock.Node()}, + } + + nodes[0].Node.Attributes["kernel.version"] = "4.9" + nodes[1].Node.Datacenter = "dc2" + nodes[2].Node.Datacenter = "dc2" + nodes[2].Node.NodeClass = "large" + + affinities := []*structs.Affinity{ + { + Operand: "=", + LTarget: "${node.datacenter}", + RTarget: "dc1", + Weight: 200, + }, + { + Operand: "=", + LTarget: "${node.datacenter}", + RTarget: "dc2", + Weight: -100, + }, + { + Operand: "version", + LTarget: "${attr.kernel.version}", + RTarget: ">4.0", + Weight: 50, + }, + { + Operand: "is", + LTarget: "${node.class}", + RTarget: "large", + Weight: 50, + }, + } + + static := NewStaticRankIterator(ctx, nodes) + + job := mock.Job() + job.ID = "foo" + tg := job.TaskGroups[0] + tg.Affinities = affinities + + nodeAffinity := NewNodeAffinityIterator(ctx, static) + nodeAffinity.SetTaskGroup(tg) + + scoreNorm := NewScoreNormalizationIterator(ctx, nodeAffinity) + + out := collectRanked(scoreNorm) + expectedScores := make(map[string]float64) + // Total weight = 400 + // Node 0 matches two affinities(dc and kernel version), total weight =250 + expectedScores[nodes[0].Node.ID] = 0.625 + + // Node 1 matches an anti affinity, weight = -100 + expectedScores[nodes[1].Node.ID] = -0.25 + + // Node 2 matches one affinity(node class) with weight 50 + expectedScores[nodes[2].Node.ID] = -0.125 + + // Node 3 matches one affinity (dc) with weight = 200 + expectedScores[nodes[3].Node.ID] = 0.5 + + require := require.New(t) + for _, n := range out { + require.Equal(expectedScores[n.Node.ID], n.FinalScore) + } } diff --git a/scheduler/select.go b/scheduler/select.go index 207c40c54..27bf6ac22 100644 --- a/scheduler/select.go +++ b/scheduler/select.go @@ -43,7 +43,7 @@ func (iter *LimitIterator) Next() *RankedNode { if len(iter.skippedNodes) < iter.maxSkip { // Try skipping ahead up to maxSkip to find an option with score lesser than the threshold - for option != nil && option.Score <= iter.scoreThreshold && len(iter.skippedNodes) < iter.maxSkip { + for option != nil && option.FinalScore <= iter.scoreThreshold && len(iter.skippedNodes) < iter.maxSkip { iter.skippedNodes = append(iter.skippedNodes, option) option = iter.source.Next() } @@ -104,7 +104,7 @@ func (iter *MaxScoreIterator) Next() *RankedNode { return iter.max } - if iter.max == nil || option.Score > iter.max.Score { + if iter.max == nil || option.FinalScore > iter.max.FinalScore { iter.max = option } } diff --git a/scheduler/select_test.go b/scheduler/select_test.go index 1e50d05b0..7625acdff 100644 --- a/scheduler/select_test.go +++ b/scheduler/select_test.go @@ -12,16 +12,16 @@ func TestLimitIterator(t *testing.T) { _, ctx := testContext(t) nodes := []*RankedNode{ { - Node: mock.Node(), - Score: 1, + Node: mock.Node(), + FinalScore: 1, }, { - Node: mock.Node(), - Score: 2, + Node: mock.Node(), + FinalScore: 2, }, { - Node: mock.Node(), - Score: 3, + Node: mock.Node(), + FinalScore: 3, }, } static := NewStaticRankIterator(ctx, nodes) @@ -73,26 +73,26 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) { desc: "Skips one low scoring node", nodes: []*RankedNode{ { - Node: nodes[0], - Score: -1, + Node: nodes[0], + FinalScore: -1, }, { - Node: nodes[1], - Score: 2, + Node: nodes[1], + FinalScore: 2, }, { - Node: nodes[2], - Score: 3, + Node: nodes[2], + FinalScore: 3, }, }, expectedOut: []*RankedNode{ { - Node: nodes[1], - Score: 2, + Node: nodes[1], + FinalScore: 2, }, { - Node: nodes[2], - Score: 3, + Node: nodes[2], + FinalScore: 3, }, }, threshold: -1, @@ -103,30 +103,30 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) { desc: "Skips maxSkip scoring nodes", nodes: []*RankedNode{ { - Node: nodes[0], - Score: -1, + Node: nodes[0], + FinalScore: -1, }, { - Node: nodes[1], - Score: -2, + Node: nodes[1], + FinalScore: -2, }, { - Node: nodes[2], - Score: 3, + Node: nodes[2], + FinalScore: 3, }, { - Node: nodes[3], - Score: 4, + Node: nodes[3], + FinalScore: 4, }, }, expectedOut: []*RankedNode{ { - Node: nodes[2], - Score: 3, + Node: nodes[2], + FinalScore: 3, }, { - Node: nodes[3], - Score: 4, + Node: nodes[3], + FinalScore: 4, }, }, threshold: -1, @@ -137,30 +137,30 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) { desc: "maxSkip limit reached", nodes: []*RankedNode{ { - Node: nodes[0], - Score: -1, + Node: nodes[0], + FinalScore: -1, }, { - Node: nodes[1], - Score: -6, + Node: nodes[1], + FinalScore: -6, }, { - Node: nodes[2], - Score: -3, + Node: nodes[2], + FinalScore: -3, }, { - Node: nodes[3], - Score: -4, + Node: nodes[3], + FinalScore: -4, }, }, expectedOut: []*RankedNode{ { - Node: nodes[2], - Score: -3, + Node: nodes[2], + FinalScore: -3, }, { - Node: nodes[3], - Score: -4, + Node: nodes[3], + FinalScore: -4, }, }, threshold: -1, @@ -171,22 +171,22 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) { desc: "draw both from skipped nodes", nodes: []*RankedNode{ { - Node: nodes[0], - Score: -1, + Node: nodes[0], + FinalScore: -1, }, { - Node: nodes[1], - Score: -6, + Node: nodes[1], + FinalScore: -6, }, }, expectedOut: []*RankedNode{ { - Node: nodes[0], - Score: -1, + Node: nodes[0], + FinalScore: -1, }, { - Node: nodes[1], - Score: -6, + Node: nodes[1], + FinalScore: -6, }, }, threshold: -1, @@ -196,22 +196,22 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) { desc: "one node above threshold, one skipped node", nodes: []*RankedNode{ { - Node: nodes[0], - Score: -1, + Node: nodes[0], + FinalScore: -1, }, { - Node: nodes[1], - Score: 5, + Node: nodes[1], + FinalScore: 5, }, }, expectedOut: []*RankedNode{ { - Node: nodes[1], - Score: 5, + Node: nodes[1], + FinalScore: 5, }, { - Node: nodes[0], - Score: -1, + Node: nodes[0], + FinalScore: -1, }, }, threshold: -1, @@ -222,30 +222,30 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) { desc: "low scoring nodes interspersed", nodes: []*RankedNode{ { - Node: nodes[0], - Score: -1, + Node: nodes[0], + FinalScore: -1, }, { - Node: nodes[1], - Score: 5, + Node: nodes[1], + FinalScore: 5, }, { - Node: nodes[2], - Score: -2, + Node: nodes[2], + FinalScore: -2, }, { - Node: nodes[3], - Score: 2, + Node: nodes[3], + FinalScore: 2, }, }, expectedOut: []*RankedNode{ { - Node: nodes[1], - Score: 5, + Node: nodes[1], + FinalScore: 5, }, { - Node: nodes[3], - Score: 2, + Node: nodes[3], + FinalScore: 2, }, }, threshold: -1, @@ -256,14 +256,14 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) { desc: "only one node, score below threshold", nodes: []*RankedNode{ { - Node: nodes[0], - Score: -1, + Node: nodes[0], + FinalScore: -1, }, }, expectedOut: []*RankedNode{ { - Node: nodes[0], - Score: -1, + Node: nodes[0], + FinalScore: -1, }, }, threshold: -1, @@ -274,22 +274,22 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) { desc: "maxSkip is more than available nodes", nodes: []*RankedNode{ { - Node: nodes[0], - Score: -2, + Node: nodes[0], + FinalScore: -2, }, { - Node: nodes[1], - Score: 1, + Node: nodes[1], + FinalScore: 1, }, }, expectedOut: []*RankedNode{ { - Node: nodes[1], - Score: 1, + Node: nodes[1], + FinalScore: 1, }, { - Node: nodes[0], - Score: -2, + Node: nodes[0], + FinalScore: -2, }, }, threshold: -1, @@ -320,16 +320,16 @@ func TestMaxScoreIterator(t *testing.T) { _, ctx := testContext(t) nodes := []*RankedNode{ { - Node: mock.Node(), - Score: 1, + Node: mock.Node(), + FinalScore: 1, }, { - Node: mock.Node(), - Score: 2, + Node: mock.Node(), + FinalScore: 2, }, { - Node: mock.Node(), - Score: 3, + Node: mock.Node(), + FinalScore: 3, }, } static := NewStaticRankIterator(ctx, nodes) diff --git a/scheduler/stack.go b/scheduler/stack.go index a324e88f9..f16782d7b 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -8,23 +8,10 @@ import ( ) const ( - // serviceJobAntiAffinityPenalty is the penalty applied - // to the score for placing an alloc on a node that - // already has an alloc for this job. - serviceJobAntiAffinityPenalty = 20.0 - - // batchJobAntiAffinityPenalty is the same as the - // serviceJobAntiAffinityPenalty but for batch type jobs. - batchJobAntiAffinityPenalty = 10.0 - - // previousFailedAllocNodePenalty is a scoring penalty for nodes - // that a failed allocation was previously run on - previousFailedAllocNodePenalty = 50.0 - // skipScoreThreshold is a threshold used in the limit iterator to skip nodes - // that have a score lower than this. -10 is the highest possible score for a - // node with penalty (based on batchJobAntiAffinityPenalty) - skipScoreThreshold = -10.0 + // that have a score lower than this. -1 is the lowest possible score for a + // node with penalties (based on job anti affinity and node rescheduling penalties + skipScoreThreshold = 0.0 // maxSkip limits the number of nodes that can be skipped in the limit iterator maxSkip = 3 @@ -66,9 +53,11 @@ type GenericStack struct { distinctPropertyConstraint *DistinctPropertyIterator binPack *BinPackIterator jobAntiAff *JobAntiAffinityIterator - nodeAntiAff *NodeAntiAffinityIterator + nodeReschedulingPenalty *NodeReschedulingPenaltyIterator limit *LimitIterator maxScore *MaxScoreIterator + nodeAffinity *NodeAffinityIterator + scoreNorm *ScoreNormalizationIterator } // NewGenericStack constructs a stack used for selecting service placements @@ -121,18 +110,17 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { s.binPack = NewBinPackIterator(ctx, rankSource, evict, 0) // Apply the job anti-affinity iterator. This is to avoid placing - // multiple allocations on the same node for this job. The penalty - // is less for batch jobs as it matters less. - penalty := serviceJobAntiAffinityPenalty - if batch { - penalty = batchJobAntiAffinityPenalty - } - s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, penalty, "") + // multiple allocations on the same node for this job. + s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "") - s.nodeAntiAff = NewNodeAntiAffinityIterator(ctx, s.jobAntiAff, previousFailedAllocNodePenalty) + s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff) + + s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty) + + s.scoreNorm = NewScoreNormalizationIterator(ctx, s.nodeAffinity) // Apply a limit function. This is to avoid scanning *every* possible node. - s.limit = NewLimitIterator(ctx, s.nodeAntiAff, 2, skipScoreThreshold, maxSkip) + s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip) // Select the node with the maximum score for placement s.maxScore = NewMaxScoreIterator(ctx, s.limit) @@ -166,7 +154,8 @@ func (s *GenericStack) SetJob(job *structs.Job) { s.distinctHostsConstraint.SetJob(job) s.distinctPropertyConstraint.SetJob(job) s.binPack.SetPriority(job.Priority) - s.jobAntiAff.SetJob(job.ID) + s.jobAntiAff.SetJob(job) + s.nodeAffinity.SetJob(job) s.ctx.Eligibility().SetJob(job) if contextual, ok := s.quota.(ContextualIterator); ok { @@ -206,8 +195,13 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*R s.distinctPropertyConstraint.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) s.binPack.SetTaskGroup(tg) + s.jobAntiAff.SetTaskGroup(tg) if options != nil { - s.nodeAntiAff.SetPenaltyNodes(options.PenaltyNodeIDs) + s.nodeReschedulingPenalty.SetPenaltyNodes(options.PenaltyNodeIDs) + } + s.nodeAffinity.SetTaskGroup(tg) + if s.nodeAffinity.hasAffinities() { + s.limit.SetLimit(math.MaxInt32) } if contextual, ok := s.quota.(ContextualIterator); ok { @@ -241,6 +235,7 @@ type SystemStack struct { taskGroupConstraint *ConstraintChecker distinctPropertyConstraint *DistinctPropertyIterator binPack *BinPackIterator + scoreNorm *ScoreNormalizationIterator } // NewSystemStack constructs a stack used for selecting service placements @@ -283,6 +278,9 @@ func NewSystemStack(ctx Context) *SystemStack { // by a particular task group. Enable eviction as system jobs are high // priority. s.binPack = NewBinPackIterator(ctx, rankSource, true, 0) + + // Apply score normalization + s.scoreNorm = NewScoreNormalizationIterator(ctx, s.binPack) return s } @@ -304,6 +302,7 @@ func (s *SystemStack) SetJob(job *structs.Job) { func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*RankedNode, *structs.Resources) { // Reset the binpack selector and context + s.scoreNorm.Reset() s.binPack.Reset() s.ctx.Reset() start := time.Now() @@ -323,7 +322,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*Ra } // Get the next option that satisfies the constraints. - option := s.binPack.Next() + option := s.scoreNorm.Next() // Ensure that the task resources were specified if option != nil && len(option.TaskResources) != len(tg.Tasks) { diff --git a/scheduler/stack_test.go b/scheduler/stack_test.go index cf8084ea8..c94aa9208 100644 --- a/scheduler/stack_test.go +++ b/scheduler/stack_test.go @@ -310,7 +310,8 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) { if met.ClassExhausted["linux-medium-pci"] != 1 { t.Fatalf("bad: %#v", met) } - if len(met.Scores) != 1 { + // Expect two metrics, one with bin packing score and one with normalized score + if len(met.Scores) != 2 { t.Fatalf("bad: %#v", met) } } @@ -531,7 +532,8 @@ func TestSystemStack_Select_BinPack_Overflow(t *testing.T) { if met.ClassExhausted["linux-medium-pci"] != 1 { t.Fatalf("bad: %#v", met) } - if len(met.Scores) != 1 { + // Should have two scores, one from bin packing and one from normalization + if len(met.Scores) != 2 { t.Fatalf("bad: %#v", met) } }