mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Address some review feedback
This commit is contained in:
@@ -365,6 +365,8 @@ func (iter *NodeReschedulingPenaltyIterator) Reset() {
|
||||
iter.source.Reset()
|
||||
}
|
||||
|
||||
// NodeAffinityIterator is used to resolve any affinity rules in the job or task group,
|
||||
// and apply a weighted score to nodes if they match.
|
||||
type NodeAffinityIterator struct {
|
||||
ctx Context
|
||||
source RankIterator
|
||||
@@ -372,6 +374,9 @@ type NodeAffinityIterator struct {
|
||||
affinities []*structs.Affinity
|
||||
}
|
||||
|
||||
// NewNodeAffinityIterator is used to create a NodeAffinityIterator that
|
||||
// applies a weighted score according to whether nodes match any
|
||||
// affinities in the job or task group.
|
||||
func NewNodeAffinityIterator(ctx Context, source RankIterator) *NodeAffinityIterator {
|
||||
return &NodeAffinityIterator{
|
||||
ctx: ctx,
|
||||
@@ -380,9 +385,7 @@ func NewNodeAffinityIterator(ctx Context, source RankIterator) *NodeAffinityIter
|
||||
}
|
||||
|
||||
func (iter *NodeAffinityIterator) SetJob(job *structs.Job) {
|
||||
if job.Affinities != nil {
|
||||
iter.jobAffinities = job.Affinities
|
||||
}
|
||||
iter.jobAffinities = job.Affinities
|
||||
}
|
||||
|
||||
func (iter *NodeAffinityIterator) SetTaskGroup(tg *structs.TaskGroup) {
|
||||
@@ -417,7 +420,7 @@ func (iter *NodeAffinityIterator) Next() *RankedNode {
|
||||
if option == nil {
|
||||
return nil
|
||||
}
|
||||
if len(iter.affinities) == 0 {
|
||||
if !iter.hasAffinities() {
|
||||
return option
|
||||
}
|
||||
// TODO(preetha): we should calculate normalized weights once and reuse it here
|
||||
@@ -441,6 +444,7 @@ func (iter *NodeAffinityIterator) Next() *RankedNode {
|
||||
}
|
||||
|
||||
func matchesAffinity(ctx Context, affinity *structs.Affinity, option *structs.Node) bool {
|
||||
//TODO(preetha): Add a step here that filters based on computed node class for potential speedup
|
||||
// Resolve the targets
|
||||
lVal, ok := resolveTarget(affinity.LTarget, option)
|
||||
if !ok {
|
||||
@@ -455,11 +459,16 @@ func matchesAffinity(ctx Context, affinity *structs.Affinity, option *structs.No
|
||||
return checkAffinity(ctx, affinity.Operand, lVal, rVal)
|
||||
}
|
||||
|
||||
// ScoreNormalizationIterator is used to combine scores from various prior
|
||||
// iterators and combine them into one final score. The current implementation
|
||||
// averages the scores together.
|
||||
type ScoreNormalizationIterator struct {
|
||||
ctx Context
|
||||
source RankIterator
|
||||
}
|
||||
|
||||
// NewScoreNormalizationIterator is used to create a ScoreNormalizationIterator that
|
||||
// averages scores from various iterators into a final score.
|
||||
func NewScoreNormalizationIterator(ctx Context, source RankIterator) *ScoreNormalizationIterator {
|
||||
return &ScoreNormalizationIterator{
|
||||
ctx: ctx,
|
||||
@@ -472,17 +481,15 @@ func (iter *ScoreNormalizationIterator) Reset() {
|
||||
|
||||
func (iter *ScoreNormalizationIterator) Next() *RankedNode {
|
||||
option := iter.source.Next()
|
||||
if option == nil {
|
||||
return nil
|
||||
if option == nil || len(option.Scores) == 0 {
|
||||
return option
|
||||
}
|
||||
numScorers := len(option.Scores)
|
||||
if numScorers > 0 {
|
||||
sum := 0.0
|
||||
for _, score := range option.Scores {
|
||||
sum += score
|
||||
}
|
||||
option.FinalScore = sum / float64(numScorers)
|
||||
sum := 0.0
|
||||
for _, score := range option.Scores {
|
||||
sum += score
|
||||
}
|
||||
option.FinalScore = sum / float64(numScorers)
|
||||
//TODO(preetha): Turn map in allocmetrics into a heap of topK scores
|
||||
iter.ctx.Metrics().ScoreNode(option.Node, "normalized-score", option.FinalScore)
|
||||
return option
|
||||
|
||||
@@ -541,24 +541,15 @@ func TestScoreNormalizationIterator(t *testing.T) {
|
||||
scoreNorm := NewScoreNormalizationIterator(ctx, nodeReschedulePenaltyIter)
|
||||
|
||||
out := collectRanked(scoreNorm)
|
||||
if len(out) != 2 {
|
||||
t.Fatalf("Bad: %#v", out)
|
||||
}
|
||||
if out[0] != nodes[0] {
|
||||
t.Fatalf("Bad: %v", out)
|
||||
}
|
||||
require := require.New(t)
|
||||
|
||||
require.Equal(2, len(out))
|
||||
require.Equal(out[0], nodes[0])
|
||||
// Score should be averaged between both scorers
|
||||
// -0.5 from job anti affinity and -1 from node rescheduling penalty
|
||||
if out[0].FinalScore != -0.75 {
|
||||
t.Fatalf("Bad Score: %#v", out[0].FinalScore)
|
||||
}
|
||||
|
||||
if out[1] != nodes[1] {
|
||||
t.Fatalf("Bad Node: %v", out)
|
||||
}
|
||||
if out[1].FinalScore != 0.0 {
|
||||
t.Fatalf("Bad Score: %v", out[1].FinalScore)
|
||||
}
|
||||
require.Equal(out[0].FinalScore, -0.75)
|
||||
require.Equal(out[1], nodes[1])
|
||||
require.Equal(out[1].FinalScore, 0.0)
|
||||
}
|
||||
|
||||
func TestNodeAffinityIterator(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user