Implement affinity support in generic scheduler

This commit is contained in:
Preetha Appan
2018-07-16 08:47:18 -05:00
parent f907c42ba5
commit 00924555a8
10 changed files with 574 additions and 220 deletions

View File

@@ -34,8 +34,8 @@ type Context interface {
// RegexpCache is a cache of regular expressions
RegexpCache() map[string]*regexp.Regexp
// ConstraintCache is a cache of version constraints
ConstraintCache() map[string]version.Constraints
// VersionConstraintCache is a cache of version constraints
VersionConstraintCache() map[string]version.Constraints
// Eligibility returns a tracker for node eligibility in the context of the
// eval.
@@ -54,7 +54,8 @@ func (e *EvalCache) RegexpCache() map[string]*regexp.Regexp {
}
return e.reCache
}
func (e *EvalCache) ConstraintCache() map[string]version.Constraints {
func (e *EvalCache) VersionConstraintCache() map[string]version.Constraints {
if e.constraintCache == nil {
e.constraintCache = make(map[string]version.Constraints)
}

View File

@@ -403,11 +403,11 @@ func (c *ConstraintChecker) Feasible(option *structs.Node) bool {
func (c *ConstraintChecker) meetsConstraint(constraint *structs.Constraint, option *structs.Node) bool {
// Resolve the targets
lVal, ok := resolveConstraintTarget(constraint.LTarget, option)
lVal, ok := resolveTarget(constraint.LTarget, option)
if !ok {
return false
}
rVal, ok := resolveConstraintTarget(constraint.RTarget, option)
rVal, ok := resolveTarget(constraint.RTarget, option)
if !ok {
return false
}
@@ -416,8 +416,8 @@ func (c *ConstraintChecker) meetsConstraint(constraint *structs.Constraint, opti
return checkConstraint(c.ctx, constraint.Operand, lVal, rVal)
}
// resolveConstraintTarget is used to resolve the LTarget and RTarget of a Constraint
func resolveConstraintTarget(target string, node *structs.Node) (interface{}, bool) {
// resolveTarget is used to resolve the LTarget and RTarget of a Constraint
func resolveTarget(target string, node *structs.Node) (interface{}, bool) {
// If no prefix, this must be a literal value
if !strings.HasPrefix(target, "${") {
return target, true
@@ -470,16 +470,28 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool {
case "<", "<=", ">", ">=":
return checkLexicalOrder(operand, lVal, rVal)
case structs.ConstraintVersion:
return checkVersionConstraint(ctx, lVal, rVal)
return checkVersionMatch(ctx, lVal, rVal)
case structs.ConstraintRegex:
return checkRegexpConstraint(ctx, lVal, rVal)
return checkRegexpMatch(ctx, lVal, rVal)
case structs.ConstraintSetContains:
return checkSetContainsConstraint(ctx, lVal, rVal)
return checkSetContainsAll(ctx, lVal, rVal)
default:
return false
}
}
// checkAffinity checks if a specific affinity is satisfied
func checkAffinity(ctx Context, operand string, lVal, rVal interface{}) bool {
switch operand {
case structs.AffinitySetContainsAny:
return checkSetContainsAny(ctx, lVal, rVal)
case structs.AffinitySetContainsAll:
return checkSetContainsAll(ctx, lVal, rVal)
default:
return checkConstraint(ctx, operand, lVal, rVal)
}
}
// checkLexicalOrder is used to check for lexical ordering
func checkLexicalOrder(op string, lVal, rVal interface{}) bool {
// Ensure the values are strings
@@ -506,9 +518,9 @@ func checkLexicalOrder(op string, lVal, rVal interface{}) bool {
}
}
// checkVersionConstraint is used to compare a version on the
// checkVersionMatch is used to compare a version on the
// left hand side with a set of constraints on the right hand side
func checkVersionConstraint(ctx Context, lVal, rVal interface{}) bool {
func checkVersionMatch(ctx Context, lVal, rVal interface{}) bool {
// Parse the version
var versionStr string
switch v := lVal.(type) {
@@ -533,7 +545,7 @@ func checkVersionConstraint(ctx Context, lVal, rVal interface{}) bool {
}
// Check the cache for a match
cache := ctx.ConstraintCache()
cache := ctx.VersionConstraintCache()
constraints := cache[constraintStr]
// Parse the constraints
@@ -549,9 +561,9 @@ func checkVersionConstraint(ctx Context, lVal, rVal interface{}) bool {
return constraints.Check(vers)
}
// checkRegexpConstraint is used to compare a value on the
// checkRegexpMatch is used to compare a value on the
// left hand side with a regexp on the right hand side
func checkRegexpConstraint(ctx Context, lVal, rVal interface{}) bool {
func checkRegexpMatch(ctx Context, lVal, rVal interface{}) bool {
// Ensure left-hand is string
lStr, ok := lVal.(string)
if !ok {
@@ -582,9 +594,9 @@ func checkRegexpConstraint(ctx Context, lVal, rVal interface{}) bool {
return re.MatchString(lStr)
}
// checkSetContainsConstraint is used to see if the left hand side contains the
// checkSetContainsAll is used to see if the left hand side contains the
// string on the right hand side
func checkSetContainsConstraint(ctx Context, lVal, rVal interface{}) bool {
func checkSetContainsAll(ctx Context, lVal, rVal interface{}) bool {
// Ensure left-hand is string
lStr, ok := lVal.(string)
if !ok {
@@ -614,6 +626,38 @@ func checkSetContainsConstraint(ctx Context, lVal, rVal interface{}) bool {
return true
}
// checkSetContainsAny is used to see if the left hand side contains any
// values on the right hand side
func checkSetContainsAny(ctx Context, lVal, rVal interface{}) bool {
// Ensure left-hand is string
lStr, ok := lVal.(string)
if !ok {
return false
}
// RHS must be a string
rStr, ok := rVal.(string)
if !ok {
return false
}
input := strings.Split(lStr, ",")
lookup := make(map[string]struct{}, len(input))
for _, in := range input {
cleaned := strings.TrimSpace(in)
lookup[cleaned] = struct{}{}
}
for _, r := range strings.Split(rStr, ",") {
cleaned := strings.TrimSpace(r)
if _, ok := lookup[cleaned]; ok {
return true
}
}
return false
}
// FeasibilityWrapper is a FeasibleIterator which wraps both job and task group
// FeasibilityCheckers in which feasibility checking can be skipped if the
// computed node class has previously been marked as eligible or ineligible.

View File

@@ -309,7 +309,7 @@ func TestResolveConstraintTarget(t *testing.T) {
}
for _, tc := range cases {
res, ok := resolveConstraintTarget(tc.target, tc.node)
res, ok := resolveTarget(tc.target, tc.node)
if ok != tc.result {
t.Fatalf("TC: %#v, Result: %v %v", tc, res, ok)
}
@@ -460,7 +460,7 @@ func TestCheckVersionConstraint(t *testing.T) {
}
for _, tc := range cases {
_, ctx := testContext(t)
if res := checkVersionConstraint(ctx, tc.lVal, tc.rVal); res != tc.result {
if res := checkVersionMatch(ctx, tc.lVal, tc.rVal); res != tc.result {
t.Fatalf("TC: %#v, Result: %v", tc, res)
}
}
@@ -495,7 +495,7 @@ func TestCheckRegexpConstraint(t *testing.T) {
}
for _, tc := range cases {
_, ctx := testContext(t)
if res := checkRegexpConstraint(ctx, tc.lVal, tc.rVal); res != tc.result {
if res := checkRegexpMatch(ctx, tc.lVal, tc.rVal); res != tc.result {
t.Fatalf("TC: %#v, Result: %v", tc, res)
}
}

View File

@@ -121,10 +121,10 @@ func (p *propertySet) populateExisting(constraint *structs.Constraint) {
}
// Filter to the correct set of allocs
allocs = p.filterAllocs(allocs, true)
allocs = filterAllocs(allocs, true, p.taskGroup)
// Get all the nodes that have been used by the allocs
nodes, err := p.buildNodeMap(allocs)
nodes, err := buildNodeMap(p.ctx.State(), allocs)
if err != nil {
p.errorBuilding = err
p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err)
@@ -132,7 +132,7 @@ func (p *propertySet) populateExisting(constraint *structs.Constraint) {
}
// Build existing properties map
p.populateProperties(allocs, nodes, p.existingValues)
populateProperties(p.constraint.LTarget, allocs, nodes, p.existingValues)
}
// PopulateProposed populates the proposed values and recomputes any cleared
@@ -149,20 +149,20 @@ func (p *propertySet) PopulateProposed() {
for _, updates := range p.ctx.Plan().NodeUpdate {
stopping = append(stopping, updates...)
}
stopping = p.filterAllocs(stopping, false)
stopping = filterAllocs(stopping, false, p.taskGroup)
// Gather the proposed allocations
var proposed []*structs.Allocation
for _, pallocs := range p.ctx.Plan().NodeAllocation {
proposed = append(proposed, pallocs...)
}
proposed = p.filterAllocs(proposed, true)
proposed = filterAllocs(proposed, true, p.taskGroup)
// Get the used nodes
both := make([]*structs.Allocation, 0, len(stopping)+len(proposed))
both = append(both, stopping...)
both = append(both, proposed...)
nodes, err := p.buildNodeMap(both)
nodes, err := buildNodeMap(p.ctx.State(), both)
if err != nil {
p.errorBuilding = err
p.ctx.Logger().Printf("[ERR] scheduler.dynamic-constraint: %v", err)
@@ -170,10 +170,10 @@ func (p *propertySet) PopulateProposed() {
}
// Populate the cleared values
p.populateProperties(stopping, nodes, p.clearedValues)
populateProperties(p.constraint.LTarget, stopping, nodes, p.clearedValues)
// Populate the proposed values
p.populateProperties(proposed, nodes, p.proposedValues)
populateProperties(p.constraint.LTarget, proposed, nodes, p.proposedValues)
// Remove any cleared value that is now being used by the proposed allocs
for value := range p.proposedValues {
@@ -247,7 +247,7 @@ func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg strin
// filterAllocs filters a set of allocations to just be those that are running
// and if the property set is operation at a task group level, for allocations
// for that task group
func (p *propertySet) filterAllocs(allocs []*structs.Allocation, filterTerminal bool) []*structs.Allocation {
func filterAllocs(allocs []*structs.Allocation, filterTerminal bool, taskGroup string) []*structs.Allocation {
n := len(allocs)
for i := 0; i < n; i++ {
remove := false
@@ -257,8 +257,8 @@ func (p *propertySet) filterAllocs(allocs []*structs.Allocation, filterTerminal
// If the constraint is on the task group filter the allocations to just
// those on the task group
if p.taskGroup != "" {
remove = remove || allocs[i].TaskGroup != p.taskGroup
if taskGroup != "" {
remove = remove || allocs[i].TaskGroup != taskGroup
}
if remove {
@@ -272,7 +272,7 @@ func (p *propertySet) filterAllocs(allocs []*structs.Allocation, filterTerminal
// buildNodeMap takes a list of allocations and returns a map of the nodes used
// by those allocations
func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*structs.Node, error) {
func buildNodeMap(state State, allocs []*structs.Allocation) (map[string]*structs.Node, error) {
// Get all the nodes that have been used by the allocs
nodes := make(map[string]*structs.Node)
ws := memdb.NewWatchSet()
@@ -281,7 +281,7 @@ func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*st
continue
}
node, err := p.ctx.State().NodeByID(ws, alloc.NodeID)
node, err := state.NodeByID(ws, alloc.NodeID)
if err != nil {
return nil, fmt.Errorf("failed to lookup node ID %q: %v", alloc.NodeID, err)
}
@@ -294,11 +294,11 @@ func (p *propertySet) buildNodeMap(allocs []*structs.Allocation) (map[string]*st
// populateProperties goes through all allocations and builds up the used
// properties from the nodes storing the results in the passed properties map.
func (p *propertySet) populateProperties(allocs []*structs.Allocation, nodes map[string]*structs.Node,
func populateProperties(lTarget string, allocs []*structs.Allocation, nodes map[string]*structs.Node,
properties map[string]uint64) {
for _, alloc := range allocs {
nProperty, ok := getProperty(nodes[alloc.NodeID], p.constraint.LTarget)
nProperty, ok := getProperty(nodes[alloc.NodeID], lTarget)
if !ok {
continue
}
@@ -313,7 +313,7 @@ func getProperty(n *structs.Node, property string) (string, bool) {
return "", false
}
val, ok := resolveConstraintTarget(property, n)
val, ok := resolveTarget(property, n)
if !ok {
return "", false
}

View File

@@ -3,15 +3,24 @@ package scheduler
import (
"fmt"
"math"
"github.com/hashicorp/nomad/nomad/structs"
)
const (
// binPackingMaxFitScore is the maximum possible bin packing fitness score.
// This is used to normalize bin packing score to a value between 0 and 1
binPackingMaxFitScore = 18.0
)
// Rank is used to provide a score and various ranking metadata
// along with a node when iterating. This state can be modified as
// various rank methods are applied.
type RankedNode struct {
Node *structs.Node
Score float64
FinalScore float64
Scores []float64
TaskResources map[string]*structs.Resources
// Allocs is used to cache the proposed allocations on the
@@ -20,7 +29,7 @@ type RankedNode struct {
}
func (r *RankedNode) GoString() string {
return fmt.Sprintf("<Node: %s Score: %0.3f>", r.Node.ID, r.Score)
return fmt.Sprintf("<Node: %s Score: %0.3f>", r.Node.ID, r.FinalScore)
}
func (r *RankedNode) ProposedAllocs(ctx Context) ([]*structs.Allocation, error) {
@@ -231,8 +240,9 @@ OUTER:
// Score the fit normally otherwise
fitness := structs.ScoreFit(option.Node, util)
option.Score += fitness
iter.ctx.Metrics().ScoreNode(option.Node, "binpack", fitness)
normalizedFit := float64(fitness) / float64(binPackingMaxFitScore)
option.Scores = append(option.Scores, normalizedFit)
iter.ctx.Metrics().ScoreNode(option.Node, "binpack", normalizedFit)
return option
}
}
@@ -245,26 +255,31 @@ func (iter *BinPackIterator) Reset() {
// along side other allocations from this job. This is used to help distribute
// load across the cluster.
type JobAntiAffinityIterator struct {
ctx Context
source RankIterator
penalty float64
jobID string
ctx Context
source RankIterator
jobID string
taskGroup string
desiredCount int
}
// NewJobAntiAffinityIterator is used to create a JobAntiAffinityIterator that
// applies the given penalty for co-placement with allocs from this job.
func NewJobAntiAffinityIterator(ctx Context, source RankIterator, penalty float64, jobID string) *JobAntiAffinityIterator {
func NewJobAntiAffinityIterator(ctx Context, source RankIterator, jobID string) *JobAntiAffinityIterator {
iter := &JobAntiAffinityIterator{
ctx: ctx,
source: source,
penalty: penalty,
jobID: jobID,
ctx: ctx,
source: source,
jobID: jobID,
}
return iter
}
func (iter *JobAntiAffinityIterator) SetJob(jobID string) {
iter.jobID = jobID
func (iter *JobAntiAffinityIterator) SetJob(job *structs.Job) {
iter.jobID = job.ID
}
func (iter *JobAntiAffinityIterator) SetTaskGroup(tg *structs.TaskGroup) {
iter.taskGroup = tg.Name
iter.desiredCount = tg.Count
}
func (iter *JobAntiAffinityIterator) Next() *RankedNode {
@@ -286,15 +301,16 @@ func (iter *JobAntiAffinityIterator) Next() *RankedNode {
// Determine the number of collisions
collisions := 0
for _, alloc := range proposed {
if alloc.JobID == iter.jobID {
if alloc.JobID == iter.jobID && alloc.TaskGroup == iter.taskGroup {
collisions += 1
}
}
// Apply a penalty if there are collisions
// Calculate the penalty based on number of collisions
// TODO(preetha): Figure out if batch jobs need a different scoring penalty where collisions matter less
if collisions > 0 {
scorePenalty := -1 * float64(collisions) * iter.penalty
option.Score += scorePenalty
scorePenalty := -1 * float64(collisions) / float64(iter.desiredCount)
option.Scores = append(option.Scores, scorePenalty)
iter.ctx.Metrics().ScoreNode(option.Node, "job-anti-affinity", scorePenalty)
}
return option
@@ -305,32 +321,30 @@ func (iter *JobAntiAffinityIterator) Reset() {
iter.source.Reset()
}
// NodeAntiAffinityIterator is used to apply a penalty to
// NodeReschedulingPenaltyIterator is used to apply a penalty to
// a node that had a previous failed allocation for the same job.
// This is used when attempting to reschedule a failed alloc
type NodeAntiAffinityIterator struct {
type NodeReschedulingPenaltyIterator struct {
ctx Context
source RankIterator
penalty float64
penaltyNodes map[string]struct{}
}
// NewNodeAntiAffinityIterator is used to create a NodeAntiAffinityIterator that
// applies the given penalty for placement onto nodes in penaltyNodes
func NewNodeAntiAffinityIterator(ctx Context, source RankIterator, penalty float64) *NodeAntiAffinityIterator {
iter := &NodeAntiAffinityIterator{
ctx: ctx,
source: source,
penalty: penalty,
// NewNodeReschedulingPenaltyIterator is used to create a NodeReschedulingPenaltyIterator that
// applies the given scoring penalty for placement onto nodes in penaltyNodes
func NewNodeReschedulingPenaltyIterator(ctx Context, source RankIterator) *NodeReschedulingPenaltyIterator {
iter := &NodeReschedulingPenaltyIterator{
ctx: ctx,
source: source,
}
return iter
}
func (iter *NodeAntiAffinityIterator) SetPenaltyNodes(penaltyNodes map[string]struct{}) {
func (iter *NodeReschedulingPenaltyIterator) SetPenaltyNodes(penaltyNodes map[string]struct{}) {
iter.penaltyNodes = penaltyNodes
}
func (iter *NodeAntiAffinityIterator) Next() *RankedNode {
func (iter *NodeReschedulingPenaltyIterator) Next() *RankedNode {
for {
option := iter.source.Next()
if option == nil {
@@ -339,14 +353,137 @@ func (iter *NodeAntiAffinityIterator) Next() *RankedNode {
_, ok := iter.penaltyNodes[option.Node.ID]
if ok {
option.Score -= iter.penalty
iter.ctx.Metrics().ScoreNode(option.Node, "node-anti-affinity", iter.penalty)
option.Scores = append(option.Scores, -1)
iter.ctx.Metrics().ScoreNode(option.Node, "node-reschedule-penalty", -1)
}
return option
}
}
func (iter *NodeAntiAffinityIterator) Reset() {
func (iter *NodeReschedulingPenaltyIterator) Reset() {
iter.penaltyNodes = make(map[string]struct{})
iter.source.Reset()
}
type NodeAffinityIterator struct {
ctx Context
source RankIterator
jobAffinities []*structs.Affinity
affinities []*structs.Affinity
}
func NewNodeAffinityIterator(ctx Context, source RankIterator) *NodeAffinityIterator {
return &NodeAffinityIterator{
ctx: ctx,
source: source,
}
}
func (iter *NodeAffinityIterator) SetJob(job *structs.Job) {
if job.Affinities != nil {
iter.jobAffinities = job.Affinities
}
}
func (iter *NodeAffinityIterator) SetTaskGroup(tg *structs.TaskGroup) {
// Merge job affinities
if iter.jobAffinities != nil {
iter.affinities = append(iter.affinities, iter.jobAffinities...)
}
// Merge task group affinities and task affinities
if tg.Affinities != nil {
iter.affinities = append(iter.affinities, tg.Affinities...)
}
for _, task := range tg.Tasks {
if task.Affinities != nil {
iter.affinities = append(iter.affinities, task.Affinities...)
}
}
}
func (iter *NodeAffinityIterator) Reset() {
iter.source.Reset()
// This method is called between each task group, so only reset the merged list
iter.affinities = nil
}
func (iter *NodeAffinityIterator) hasAffinities() bool {
return len(iter.affinities) > 0
}
func (iter *NodeAffinityIterator) Next() *RankedNode {
option := iter.source.Next()
if option == nil {
return nil
}
if len(iter.affinities) == 0 {
return option
}
// TODO(preetha): we should calculate normalized weights once and reuse it here
sumWeight := 0.0
for _, affinity := range iter.affinities {
sumWeight += math.Abs(affinity.Weight)
}
totalAffinityScore := 0.0
for _, affinity := range iter.affinities {
if matchesAffinity(iter.ctx, affinity, option.Node) {
normScore := affinity.Weight / sumWeight
totalAffinityScore += normScore
}
}
if totalAffinityScore != 0.0 {
option.Scores = append(option.Scores, totalAffinityScore)
iter.ctx.Metrics().ScoreNode(option.Node, "node-affinity", totalAffinityScore)
}
return option
}
func matchesAffinity(ctx Context, affinity *structs.Affinity, option *structs.Node) bool {
// Resolve the targets
lVal, ok := resolveTarget(affinity.LTarget, option)
if !ok {
return false
}
rVal, ok := resolveTarget(affinity.RTarget, option)
if !ok {
return false
}
// Check if satisfied
return checkAffinity(ctx, affinity.Operand, lVal, rVal)
}
type ScoreNormalizationIterator struct {
ctx Context
source RankIterator
}
func NewScoreNormalizationIterator(ctx Context, source RankIterator) *ScoreNormalizationIterator {
return &ScoreNormalizationIterator{
ctx: ctx,
source: source}
}
func (iter *ScoreNormalizationIterator) Reset() {
iter.source.Reset()
}
func (iter *ScoreNormalizationIterator) Next() *RankedNode {
option := iter.source.Next()
if option == nil {
return nil
}
numScorers := len(option.Scores)
if numScorers > 0 {
sum := 0.0
for _, score := range option.Scores {
sum += score
}
option.FinalScore = sum / float64(numScorers)
}
//TODO(preetha): Turn map in allocmetrics into a heap of topK scores
iter.ctx.Metrics().ScoreNode(option.Node, "normalized-score", option.FinalScore)
return option
}

View File

@@ -85,7 +85,9 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) {
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
out := collectRanked(binp)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
if len(out) != 2 {
t.Fatalf("Bad: %v", out)
}
@@ -93,11 +95,11 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) {
t.Fatalf("Bad: %v", out)
}
if out[0].Score != 18 {
t.Fatalf("Bad: %v", out[0])
if out[0].FinalScore != 1.0 {
t.Fatalf("Bad Score: %v", out[0].FinalScore)
}
if out[1].Score < 10 || out[1].Score > 16 {
t.Fatalf("Bad: %v", out[1])
if out[1].FinalScore < 0.75 || out[1].FinalScore > 0.95 {
t.Fatalf("Bad Score: %v", out[1].FinalScore)
}
}
@@ -164,16 +166,18 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) {
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
out := collectRanked(binp)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
if len(out) != 1 {
t.Fatalf("Bad: %#v", out)
}
if out[0] != nodes[1] {
t.Fatalf("Bad: %v", out)
t.Fatalf("Bad Score: %v", out)
}
if out[0].Score != 18 {
t.Fatalf("Bad: %v", out[0])
if out[0].FinalScore != 1.0 {
t.Fatalf("Bad Score: %v", out[0].FinalScore)
}
}
@@ -254,15 +258,17 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) {
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
out := collectRanked(binp)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
if len(out) != 1 {
t.Fatalf("Bad: %#v", out)
}
if out[0] != nodes[1] {
t.Fatalf("Bad: %v", out)
}
if out[0].Score != 18 {
t.Fatalf("Bad: %v", out[0])
if out[0].FinalScore != 1.0 {
t.Fatalf("Bad Score: %v", out[0].FinalScore)
}
}
@@ -348,18 +354,20 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
out := collectRanked(binp)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
if len(out) != 2 {
t.Fatalf("Bad: %#v", out)
}
if out[0] != nodes[0] || out[1] != nodes[1] {
t.Fatalf("Bad: %v", out)
}
if out[0].Score < 10 || out[0].Score > 16 {
t.Fatalf("Bad: %v", out[0])
if out[0].FinalScore < 0.50 || out[0].FinalScore > 0.95 {
t.Fatalf("Bad Score: %v", out[0].FinalScore)
}
if out[1].Score != 18 {
t.Fatalf("Bad: %v", out[1])
if out[1].FinalScore != 1 {
t.Fatalf("Bad Score: %v", out[1].FinalScore)
}
}
@@ -379,16 +387,23 @@ func TestJobAntiAffinity_PlannedAlloc(t *testing.T) {
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
job.ID = "foo"
tg := job.TaskGroups[0]
tg.Count = 4
// Add a planned alloc to node1 that fills it
plan := ctx.Plan()
plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
ID: uuid.Generate(),
JobID: "foo",
ID: uuid.Generate(),
JobID: "foo",
TaskGroup: tg.Name,
},
{
ID: uuid.Generate(),
JobID: "foo",
ID: uuid.Generate(),
JobID: "foo",
TaskGroup: tg.Name,
},
}
@@ -399,24 +414,29 @@ func TestJobAntiAffinity_PlannedAlloc(t *testing.T) {
},
}
binp := NewJobAntiAffinityIterator(ctx, static, 5.0, "foo")
jobAntiAff := NewJobAntiAffinityIterator(ctx, static, "foo")
jobAntiAff.SetJob(job)
jobAntiAff.SetTaskGroup(tg)
out := collectRanked(binp)
scoreNorm := NewScoreNormalizationIterator(ctx, jobAntiAff)
out := collectRanked(scoreNorm)
if len(out) != 2 {
t.Fatalf("Bad: %#v", out)
}
if out[0] != nodes[0] {
t.Fatalf("Bad: %v", out)
}
if out[0].Score != -10.0 {
t.Fatalf("Bad: %#v", out[0])
// Score should be -(#collissions/desired_count) => -(2/4)
if out[0].FinalScore != -0.5 {
t.Fatalf("Bad Score: %#v", out[0].FinalScore)
}
if out[1] != nodes[1] {
t.Fatalf("Bad: %v", out)
}
if out[1].Score != 0.0 {
t.Fatalf("Bad: %v", out[1])
if out[1].FinalScore != 0.0 {
t.Fatalf("Bad Score: %v", out[1].FinalScore)
}
}
@@ -450,17 +470,168 @@ func TestNodeAntiAffinity_PenaltyNodes(t *testing.T) {
}
static := NewStaticRankIterator(ctx, nodes)
nodeAntiAffIter := NewNodeAntiAffinityIterator(ctx, static, 50.0)
nodeAntiAffIter := NewNodeReschedulingPenaltyIterator(ctx, static)
nodeAntiAffIter.SetPenaltyNodes(map[string]struct{}{node1.ID: {}})
out := collectRanked(nodeAntiAffIter)
scoreNorm := NewScoreNormalizationIterator(ctx, nodeAntiAffIter)
out := collectRanked(scoreNorm)
require := require.New(t)
require.Equal(2, len(out))
require.Equal(node1.ID, out[0].Node.ID)
require.Equal(-50.0, out[0].Score)
require.Equal(-1.0, out[0].FinalScore)
require.Equal(node2.ID, out[1].Node.ID)
require.Equal(0.0, out[1].Score)
require.Equal(0.0, out[1].FinalScore)
}
func TestScoreNormalizationIterator(t *testing.T) {
// Test normalized scores when there is more than one scorer
_, ctx := testContext(t)
nodes := []*RankedNode{
{
Node: &structs.Node{
ID: uuid.Generate(),
},
},
{
Node: &structs.Node{
ID: uuid.Generate(),
},
},
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
job.ID = "foo"
tg := job.TaskGroups[0]
tg.Count = 4
// Add a planned alloc to node1 that fills it
plan := ctx.Plan()
plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
ID: uuid.Generate(),
JobID: "foo",
TaskGroup: tg.Name,
},
{
ID: uuid.Generate(),
JobID: "foo",
TaskGroup: tg.Name,
},
}
// Add a planned alloc to node2 that half fills it
plan.NodeAllocation[nodes[1].Node.ID] = []*structs.Allocation{
{
JobID: "bar",
},
}
jobAntiAff := NewJobAntiAffinityIterator(ctx, static, "foo")
jobAntiAff.SetJob(job)
jobAntiAff.SetTaskGroup(tg)
nodeReschedulePenaltyIter := NewNodeReschedulingPenaltyIterator(ctx, jobAntiAff)
nodeReschedulePenaltyIter.SetPenaltyNodes(map[string]struct{}{nodes[0].Node.ID: {}})
scoreNorm := NewScoreNormalizationIterator(ctx, nodeReschedulePenaltyIter)
out := collectRanked(scoreNorm)
if len(out) != 2 {
t.Fatalf("Bad: %#v", out)
}
if out[0] != nodes[0] {
t.Fatalf("Bad: %v", out)
}
// Score should be averaged between both scorers
// -0.5 from job anti affinity and -1 from node rescheduling penalty
if out[0].FinalScore != -0.75 {
t.Fatalf("Bad Score: %#v", out[0].FinalScore)
}
if out[1] != nodes[1] {
t.Fatalf("Bad Node: %v", out)
}
if out[1].FinalScore != 0.0 {
t.Fatalf("Bad Score: %v", out[1].FinalScore)
}
}
func TestNodeAffinityIterator(t *testing.T) {
_, ctx := testContext(t)
nodes := []*RankedNode{
{Node: mock.Node()},
{Node: mock.Node()},
{Node: mock.Node()},
{Node: mock.Node()},
}
nodes[0].Node.Attributes["kernel.version"] = "4.9"
nodes[1].Node.Datacenter = "dc2"
nodes[2].Node.Datacenter = "dc2"
nodes[2].Node.NodeClass = "large"
affinities := []*structs.Affinity{
{
Operand: "=",
LTarget: "${node.datacenter}",
RTarget: "dc1",
Weight: 200,
},
{
Operand: "=",
LTarget: "${node.datacenter}",
RTarget: "dc2",
Weight: -100,
},
{
Operand: "version",
LTarget: "${attr.kernel.version}",
RTarget: ">4.0",
Weight: 50,
},
{
Operand: "is",
LTarget: "${node.class}",
RTarget: "large",
Weight: 50,
},
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
job.ID = "foo"
tg := job.TaskGroups[0]
tg.Affinities = affinities
nodeAffinity := NewNodeAffinityIterator(ctx, static)
nodeAffinity.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, nodeAffinity)
out := collectRanked(scoreNorm)
expectedScores := make(map[string]float64)
// Total weight = 400
// Node 0 matches two affinities(dc and kernel version), total weight =250
expectedScores[nodes[0].Node.ID] = 0.625
// Node 1 matches an anti affinity, weight = -100
expectedScores[nodes[1].Node.ID] = -0.25
// Node 2 matches one affinity(node class) with weight 50
expectedScores[nodes[2].Node.ID] = -0.125
// Node 3 matches one affinity (dc) with weight = 200
expectedScores[nodes[3].Node.ID] = 0.5
require := require.New(t)
for _, n := range out {
require.Equal(expectedScores[n.Node.ID], n.FinalScore)
}
}

View File

@@ -43,7 +43,7 @@ func (iter *LimitIterator) Next() *RankedNode {
if len(iter.skippedNodes) < iter.maxSkip {
// Try skipping ahead up to maxSkip to find an option with score lesser than the threshold
for option != nil && option.Score <= iter.scoreThreshold && len(iter.skippedNodes) < iter.maxSkip {
for option != nil && option.FinalScore <= iter.scoreThreshold && len(iter.skippedNodes) < iter.maxSkip {
iter.skippedNodes = append(iter.skippedNodes, option)
option = iter.source.Next()
}
@@ -104,7 +104,7 @@ func (iter *MaxScoreIterator) Next() *RankedNode {
return iter.max
}
if iter.max == nil || option.Score > iter.max.Score {
if iter.max == nil || option.FinalScore > iter.max.FinalScore {
iter.max = option
}
}

View File

@@ -12,16 +12,16 @@ func TestLimitIterator(t *testing.T) {
_, ctx := testContext(t)
nodes := []*RankedNode{
{
Node: mock.Node(),
Score: 1,
Node: mock.Node(),
FinalScore: 1,
},
{
Node: mock.Node(),
Score: 2,
Node: mock.Node(),
FinalScore: 2,
},
{
Node: mock.Node(),
Score: 3,
Node: mock.Node(),
FinalScore: 3,
},
}
static := NewStaticRankIterator(ctx, nodes)
@@ -73,26 +73,26 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "Skips one low scoring node",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
{
Node: nodes[1],
Score: 2,
Node: nodes[1],
FinalScore: 2,
},
{
Node: nodes[2],
Score: 3,
Node: nodes[2],
FinalScore: 3,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[1],
Score: 2,
Node: nodes[1],
FinalScore: 2,
},
{
Node: nodes[2],
Score: 3,
Node: nodes[2],
FinalScore: 3,
},
},
threshold: -1,
@@ -103,30 +103,30 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "Skips maxSkip scoring nodes",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
{
Node: nodes[1],
Score: -2,
Node: nodes[1],
FinalScore: -2,
},
{
Node: nodes[2],
Score: 3,
Node: nodes[2],
FinalScore: 3,
},
{
Node: nodes[3],
Score: 4,
Node: nodes[3],
FinalScore: 4,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[2],
Score: 3,
Node: nodes[2],
FinalScore: 3,
},
{
Node: nodes[3],
Score: 4,
Node: nodes[3],
FinalScore: 4,
},
},
threshold: -1,
@@ -137,30 +137,30 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "maxSkip limit reached",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
{
Node: nodes[1],
Score: -6,
Node: nodes[1],
FinalScore: -6,
},
{
Node: nodes[2],
Score: -3,
Node: nodes[2],
FinalScore: -3,
},
{
Node: nodes[3],
Score: -4,
Node: nodes[3],
FinalScore: -4,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[2],
Score: -3,
Node: nodes[2],
FinalScore: -3,
},
{
Node: nodes[3],
Score: -4,
Node: nodes[3],
FinalScore: -4,
},
},
threshold: -1,
@@ -171,22 +171,22 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "draw both from skipped nodes",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
{
Node: nodes[1],
Score: -6,
Node: nodes[1],
FinalScore: -6,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
{
Node: nodes[1],
Score: -6,
Node: nodes[1],
FinalScore: -6,
},
},
threshold: -1,
@@ -196,22 +196,22 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "one node above threshold, one skipped node",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
{
Node: nodes[1],
Score: 5,
Node: nodes[1],
FinalScore: 5,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[1],
Score: 5,
Node: nodes[1],
FinalScore: 5,
},
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
},
threshold: -1,
@@ -222,30 +222,30 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "low scoring nodes interspersed",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
{
Node: nodes[1],
Score: 5,
Node: nodes[1],
FinalScore: 5,
},
{
Node: nodes[2],
Score: -2,
Node: nodes[2],
FinalScore: -2,
},
{
Node: nodes[3],
Score: 2,
Node: nodes[3],
FinalScore: 2,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[1],
Score: 5,
Node: nodes[1],
FinalScore: 5,
},
{
Node: nodes[3],
Score: 2,
Node: nodes[3],
FinalScore: 2,
},
},
threshold: -1,
@@ -256,14 +256,14 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "only one node, score below threshold",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[0],
Score: -1,
Node: nodes[0],
FinalScore: -1,
},
},
threshold: -1,
@@ -274,22 +274,22 @@ func TestLimitIterator_ScoreThreshold(t *testing.T) {
desc: "maxSkip is more than available nodes",
nodes: []*RankedNode{
{
Node: nodes[0],
Score: -2,
Node: nodes[0],
FinalScore: -2,
},
{
Node: nodes[1],
Score: 1,
Node: nodes[1],
FinalScore: 1,
},
},
expectedOut: []*RankedNode{
{
Node: nodes[1],
Score: 1,
Node: nodes[1],
FinalScore: 1,
},
{
Node: nodes[0],
Score: -2,
Node: nodes[0],
FinalScore: -2,
},
},
threshold: -1,
@@ -320,16 +320,16 @@ func TestMaxScoreIterator(t *testing.T) {
_, ctx := testContext(t)
nodes := []*RankedNode{
{
Node: mock.Node(),
Score: 1,
Node: mock.Node(),
FinalScore: 1,
},
{
Node: mock.Node(),
Score: 2,
Node: mock.Node(),
FinalScore: 2,
},
{
Node: mock.Node(),
Score: 3,
Node: mock.Node(),
FinalScore: 3,
},
}
static := NewStaticRankIterator(ctx, nodes)

View File

@@ -8,23 +8,10 @@ import (
)
const (
// serviceJobAntiAffinityPenalty is the penalty applied
// to the score for placing an alloc on a node that
// already has an alloc for this job.
serviceJobAntiAffinityPenalty = 20.0
// batchJobAntiAffinityPenalty is the same as the
// serviceJobAntiAffinityPenalty but for batch type jobs.
batchJobAntiAffinityPenalty = 10.0
// previousFailedAllocNodePenalty is a scoring penalty for nodes
// that a failed allocation was previously run on
previousFailedAllocNodePenalty = 50.0
// skipScoreThreshold is a threshold used in the limit iterator to skip nodes
// that have a score lower than this. -10 is the highest possible score for a
// node with penalty (based on batchJobAntiAffinityPenalty)
skipScoreThreshold = -10.0
// that have a score lower than this. -1 is the lowest possible score for a
// node with penalties (based on job anti affinity and node rescheduling penalties
skipScoreThreshold = 0.0
// maxSkip limits the number of nodes that can be skipped in the limit iterator
maxSkip = 3
@@ -66,9 +53,11 @@ type GenericStack struct {
distinctPropertyConstraint *DistinctPropertyIterator
binPack *BinPackIterator
jobAntiAff *JobAntiAffinityIterator
nodeAntiAff *NodeAntiAffinityIterator
nodeReschedulingPenalty *NodeReschedulingPenaltyIterator
limit *LimitIterator
maxScore *MaxScoreIterator
nodeAffinity *NodeAffinityIterator
scoreNorm *ScoreNormalizationIterator
}
// NewGenericStack constructs a stack used for selecting service placements
@@ -121,18 +110,17 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
s.binPack = NewBinPackIterator(ctx, rankSource, evict, 0)
// Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job. The penalty
// is less for batch jobs as it matters less.
penalty := serviceJobAntiAffinityPenalty
if batch {
penalty = batchJobAntiAffinityPenalty
}
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, penalty, "")
// multiple allocations on the same node for this job.
s.jobAntiAff = NewJobAntiAffinityIterator(ctx, s.binPack, "")
s.nodeAntiAff = NewNodeAntiAffinityIterator(ctx, s.jobAntiAff, previousFailedAllocNodePenalty)
s.nodeReschedulingPenalty = NewNodeReschedulingPenaltyIterator(ctx, s.jobAntiAff)
s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty)
s.scoreNorm = NewScoreNormalizationIterator(ctx, s.nodeAffinity)
// Apply a limit function. This is to avoid scanning *every* possible node.
s.limit = NewLimitIterator(ctx, s.nodeAntiAff, 2, skipScoreThreshold, maxSkip)
s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip)
// Select the node with the maximum score for placement
s.maxScore = NewMaxScoreIterator(ctx, s.limit)
@@ -166,7 +154,8 @@ func (s *GenericStack) SetJob(job *structs.Job) {
s.distinctHostsConstraint.SetJob(job)
s.distinctPropertyConstraint.SetJob(job)
s.binPack.SetPriority(job.Priority)
s.jobAntiAff.SetJob(job.ID)
s.jobAntiAff.SetJob(job)
s.nodeAffinity.SetJob(job)
s.ctx.Eligibility().SetJob(job)
if contextual, ok := s.quota.(ContextualIterator); ok {
@@ -206,8 +195,13 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*R
s.distinctPropertyConstraint.SetTaskGroup(tg)
s.wrappedChecks.SetTaskGroup(tg.Name)
s.binPack.SetTaskGroup(tg)
s.jobAntiAff.SetTaskGroup(tg)
if options != nil {
s.nodeAntiAff.SetPenaltyNodes(options.PenaltyNodeIDs)
s.nodeReschedulingPenalty.SetPenaltyNodes(options.PenaltyNodeIDs)
}
s.nodeAffinity.SetTaskGroup(tg)
if s.nodeAffinity.hasAffinities() {
s.limit.SetLimit(math.MaxInt32)
}
if contextual, ok := s.quota.(ContextualIterator); ok {
@@ -241,6 +235,7 @@ type SystemStack struct {
taskGroupConstraint *ConstraintChecker
distinctPropertyConstraint *DistinctPropertyIterator
binPack *BinPackIterator
scoreNorm *ScoreNormalizationIterator
}
// NewSystemStack constructs a stack used for selecting service placements
@@ -283,6 +278,9 @@ func NewSystemStack(ctx Context) *SystemStack {
// by a particular task group. Enable eviction as system jobs are high
// priority.
s.binPack = NewBinPackIterator(ctx, rankSource, true, 0)
// Apply score normalization
s.scoreNorm = NewScoreNormalizationIterator(ctx, s.binPack)
return s
}
@@ -304,6 +302,7 @@ func (s *SystemStack) SetJob(job *structs.Job) {
func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*RankedNode, *structs.Resources) {
// Reset the binpack selector and context
s.scoreNorm.Reset()
s.binPack.Reset()
s.ctx.Reset()
start := time.Now()
@@ -323,7 +322,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*Ra
}
// Get the next option that satisfies the constraints.
option := s.binPack.Next()
option := s.scoreNorm.Next()
// Ensure that the task resources were specified
if option != nil && len(option.TaskResources) != len(tg.Tasks) {

View File

@@ -310,7 +310,8 @@ func TestServiceStack_Select_BinPack_Overflow(t *testing.T) {
if met.ClassExhausted["linux-medium-pci"] != 1 {
t.Fatalf("bad: %#v", met)
}
if len(met.Scores) != 1 {
// Expect two metrics, one with bin packing score and one with normalized score
if len(met.Scores) != 2 {
t.Fatalf("bad: %#v", met)
}
}
@@ -531,7 +532,8 @@ func TestSystemStack_Select_BinPack_Overflow(t *testing.T) {
if met.ClassExhausted["linux-medium-pci"] != 1 {
t.Fatalf("bad: %#v", met)
}
if len(met.Scores) != 1 {
// Should have two scores, one from bin packing and one from normalization
if len(met.Scores) != 2 {
t.Fatalf("bad: %#v", met)
}
}