Implement spread iterator that scores according to percentage of desired count in each target.

Added this as a new step in the stack and some unit tests
This commit is contained in:
Preetha Appan
2018-07-17 17:25:38 -05:00
parent a6d309c005
commit fd697272a7
5 changed files with 547 additions and 34 deletions

View File

@@ -602,6 +602,89 @@ func TestServiceSched_JobRegister_DistinctProperty_TaskGroup_Incr(t *testing.T)
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
// Test job registration with spread configured
func TestServiceSched_Spread(t *testing.T) {
h := NewHarness(t)
assert := assert.New(t)
// Create a job that uses spread over data center
job := mock.Job()
job.Datacenters = []string{"dc1", "dc2"}
job.TaskGroups[0].Count = 10
job.TaskGroups[0].Spreads = append(job.TaskGroups[0].Spreads,
&structs.Spread{
Attribute: "${node.datacenter}",
Weight: 100,
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 70,
},
{
Value: "dc2",
Percent: 30,
},
},
})
assert.Nil(h.State.UpsertJob(h.NextIndex(), job), "UpsertJob")
// Create some nodes, half in dc2
var nodes []*structs.Node
nodeMap := make(map[string]*structs.Node)
for i := 0; i < 6; i++ {
node := mock.Node()
if i%2 == 0 {
node.Datacenter = "dc2"
}
nodes = append(nodes, node)
assert.Nil(h.State.UpsertNode(h.NextIndex(), node), "UpsertNode")
nodeMap[node.ID] = node
}
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
noErr(t, h.State.UpsertEvals(h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
assert.Nil(h.Process(NewServiceScheduler, eval), "Process")
// Ensure a single plan
assert.Len(h.Plans, 1, "Number of plans")
plan := h.Plans[0]
// Ensure the plan doesn't have annotations.
assert.Nil(plan.Annotations, "Plan.Annotations")
// Ensure the eval hasn't spawned blocked eval
assert.Len(h.CreateEvals, 0, "Created Evals")
// Ensure the plan allocated
var planned []*structs.Allocation
dcAllocsMap := make(map[string]int)
for nodeId, allocList := range plan.NodeAllocation {
planned = append(planned, allocList...)
dc := nodeMap[nodeId].Datacenter
c := dcAllocsMap[dc]
c += len(allocList)
dcAllocsMap[dc] = c
}
assert.Len(planned, 10, "Planned Allocations")
expectedCounts := make(map[string]int)
expectedCounts["dc1"] = 7
expectedCounts["dc2"] = 3
require.Equal(t, expectedCounts, dcAllocsMap)
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobRegister_Annotate(t *testing.T) {
h := NewHarness(t)

View File

@@ -23,8 +23,8 @@ type propertySet struct {
// taskGroup is optionally set if the constraint is for a task group
taskGroup string
// constraint is the constraint this property set is checking
constraint *structs.Constraint
// targetAttribute is the attribute this property set is checking
targetAttribute string
// allowedCount is the allowed number of allocations that can have the
// distinct property
@@ -60,7 +60,7 @@ func NewPropertySet(ctx Context, job *structs.Job) *propertySet {
return p
}
// SetJobConstraint is used to parameterize the property set for a
// SetJobConstraintAttribute is used to parameterize the property set for a
// distinct_property constraint set at the job level.
func (p *propertySet) SetJobConstraint(constraint *structs.Constraint) {
p.setConstraint(constraint, "")
@@ -75,14 +75,7 @@ func (p *propertySet) SetTGConstraint(constraint *structs.Constraint, taskGroup
// setConstraint is a shared helper for setting a job or task group constraint.
func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup string) {
// Store that this is for a task group
if taskGroup != "" {
p.taskGroup = taskGroup
}
// Store the constraint
p.constraint = constraint
allowedCount := uint64(0)
// Determine the number of allowed allocations with the property.
if v := constraint.RTarget; v != "" {
c, err := strconv.ParseUint(v, 10, 64)
@@ -92,14 +85,35 @@ func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup st
return
}
p.allowedCount = c
allowedCount = c
} else {
p.allowedCount = 1
allowedCount = 1
}
p.setPropertySetInner(constraint.LTarget, allowedCount, taskGroup)
}
// SetTargetAttribute is used to populate this property set without also storing allowed count
// This is used when evaluating spread stanzas
func (p *propertySet) SetTargetAttribute(targetAttribute string, taskGroup string) {
p.setPropertySetInner(targetAttribute, 0, taskGroup)
}
// setConstraint is a shared helper for setting a job or task group attribute and allowedCount
// allowedCount can be zero when this is used in evaluating spread stanzas
func (p *propertySet) setPropertySetInner(targetAttribute string, allowedCount uint64, taskGroup string) {
// Store that this is for a task group
if taskGroup != "" {
p.taskGroup = taskGroup
}
// Store the constraint
p.targetAttribute = targetAttribute
p.allowedCount = allowedCount
// Determine the number of existing allocations that are using a property
// value
p.populateExisting(constraint)
p.populateExisting(targetAttribute)
// Populate the proposed when setting the constraint. We do this because
// when detecting if we can inplace update an allocation we stage an
@@ -110,7 +124,7 @@ func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup st
// populateExisting is a helper shared when setting the constraint to populate
// the existing values.
func (p *propertySet) populateExisting(constraint *structs.Constraint) {
func (p *propertySet) populateExisting(targetAttribute string) {
// Retrieve all previously placed allocations
ws := memdb.NewWatchSet()
allocs, err := p.ctx.State().AllocsByJob(ws, p.namespace, p.jobID, false)
@@ -193,15 +207,32 @@ func (p *propertySet) PopulateProposed() {
// placements. If the option does not satisfy the constraints an explanation is
// given.
func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg string) (bool, string) {
nValue, errorMsg, usedCount := p.UsedCount(option, tg)
if errorMsg != "" {
return false, errorMsg
}
// The property value has been used but within the number of allowed
// allocations.
if usedCount < p.allowedCount {
return true, ""
}
return false, fmt.Sprintf("distinct_property: %s=%s used by %d allocs", p.targetAttribute, nValue, usedCount)
}
// UsedCount returns the number of times the value of the attribute being tracked by this
// property set is used across current and proposed allocations. It also returns the resolved
// attribute value for the node, and an error message if it couldn't be resolved correctly
func (p *propertySet) UsedCount(option *structs.Node, tg string) (string, string, uint64) {
// Check if there was an error building
if p.errorBuilding != nil {
return false, p.errorBuilding.Error()
return "", p.errorBuilding.Error(), 0
}
// Get the nodes property value
nValue, ok := getProperty(option, p.constraint.LTarget)
nValue, ok := getProperty(option, p.targetAttribute)
if !ok {
return false, fmt.Sprintf("missing property %q", p.constraint.LTarget)
return nValue, fmt.Sprintf("missing property %q", p.targetAttribute), 0
}
// combine the counts of how many times the property has been used by
@@ -229,19 +260,8 @@ func (p *propertySet) SatisfiesDistinctProperties(option *structs.Node, tg strin
}
}
usedCount, used := combinedUse[nValue]
if !used {
// The property value has never been used so we can use it.
return true, ""
}
// The property value has been used but within the number of allowed
// allocations.
if usedCount < p.allowedCount {
return true, ""
}
return false, fmt.Sprintf("distinct_property: %s=%s used by %d allocs", p.constraint.LTarget, nValue, usedCount)
usedCount := combinedUse[nValue]
return nValue, "", usedCount
}
// filterAllocs filters a set of allocations to just be those that are running
@@ -298,7 +318,7 @@ func (p *propertySet) populateProperties(allocs []*structs.Allocation, nodes map
properties map[string]uint64) {
for _, alloc := range allocs {
nProperty, ok := getProperty(nodes[alloc.NodeID], p.constraint.LTarget)
nProperty, ok := getProperty(nodes[alloc.NodeID], p.targetAttribute)
if !ok {
continue
}

155
scheduler/spread.go Normal file
View File

@@ -0,0 +1,155 @@
package scheduler
import (
"github.com/hashicorp/nomad/nomad/structs"
)
// SpreadIterator is used to spread allocations across a specified attribute
// according to preset weights
type SpreadIterator struct {
ctx Context
source RankIterator
job *structs.Job
tg *structs.TaskGroup
jobSpreads []*structs.Spread
tgSpreadInfo map[string]spreadAttributeMap
sumSpreadWeights int
hasSpread bool
groupPropertySets map[string][]*propertySet
}
type spreadAttributeMap map[string]*spreadInfo
type spreadInfo struct {
sumWeight uint32
weight int
desiredCounts map[string]float64
}
func NewSpreadIterator(ctx Context, source RankIterator) *SpreadIterator {
iter := &SpreadIterator{
ctx: ctx,
source: source,
groupPropertySets: make(map[string][]*propertySet),
tgSpreadInfo: make(map[string]spreadAttributeMap),
}
return iter
}
func (iter *SpreadIterator) Reset() {
iter.source.Reset()
for _, sets := range iter.groupPropertySets {
for _, ps := range sets {
ps.PopulateProposed()
}
}
}
func (iter *SpreadIterator) SetJob(job *structs.Job) {
iter.job = job
if job.Spreads != nil {
iter.jobSpreads = job.Spreads
}
}
func (iter *SpreadIterator) SetTaskGroup(tg *structs.TaskGroup) {
iter.tg = tg
// Build the property set at the taskgroup level
if _, ok := iter.groupPropertySets[tg.Name]; !ok {
// First add property sets that are at the job level for this task group
for _, spread := range iter.jobSpreads {
pset := NewPropertySet(iter.ctx, iter.job)
pset.SetTargetAttribute(spread.Attribute, tg.Name)
iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset)
}
// Include property sets at the task group level
for _, spread := range tg.Spreads {
pset := NewPropertySet(iter.ctx, iter.job)
pset.SetTargetAttribute(spread.Attribute, tg.Name)
iter.groupPropertySets[tg.Name] = append(iter.groupPropertySets[tg.Name], pset)
}
}
// Check if there is a distinct property
iter.hasSpread = len(iter.groupPropertySets[tg.Name]) != 0
// Build tgSpreadInfo at the task group level
if _, ok := iter.tgSpreadInfo[tg.Name]; !ok {
iter.computeSpreadInfo(tg)
}
}
func (iter *SpreadIterator) hasSpreads() bool {
return iter.hasSpread
}
func (iter *SpreadIterator) Next() *RankedNode {
for {
option := iter.source.Next()
// Hot path if there is nothing to check
if option == nil || !iter.hasSpreads() {
return option
}
tgName := iter.tg.Name
propertySets := iter.groupPropertySets[tgName]
// Iterate over each spread attribute's property set and add a weighted score
totalSpreadScore := 0.0
for _, pset := range propertySets {
nValue, errorMsg, usedCount := pset.UsedCount(option.Node, tgName)
if errorMsg != "" {
// Skip if there was errors in resolving this attribute to compute used counts
continue
}
spreadAttributeMap := iter.tgSpreadInfo[tgName]
spreadDetails := spreadAttributeMap[pset.targetAttribute]
// Get the desired count
desiredCount, ok := spreadDetails.desiredCounts[nValue]
if !ok {
// Warn about missing ratio
iter.ctx.Logger().Printf("[WARN] sched: missing desired distribution percentage for attribute value %v in spread stanza for job %v", nValue, iter.job.ID)
continue
}
if float64(usedCount) < desiredCount {
// Calculate the relative weight of this specific spread attribute
spreadWeight := float64(spreadDetails.weight) / float64(iter.sumSpreadWeights)
// Score Boost is proportional the difference between current and desired count
// It is multiplied with the spread weight to account for cases where the job has
// more than one spread attribute
scoreBoost := ((desiredCount - float64(usedCount)) / desiredCount) * spreadWeight
totalSpreadScore += scoreBoost
}
}
if totalSpreadScore != 0.0 {
option.Scores = append(option.Scores, totalSpreadScore)
iter.ctx.Metrics().ScoreNode(option.Node, "allocation-spread", totalSpreadScore)
}
return option
}
}
// computeSpreadInfo computes and stores percentages and total values
// from all spreads that apply to a specific task group
func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) {
spreadInfos := make(spreadAttributeMap, len(tg.Spreads))
totalCount := tg.Count
for _, spread := range tg.Spreads {
sumWeight := uint32(0)
for _, st := range spread.SpreadTarget {
sumWeight += st.Percent
}
si := &spreadInfo{sumWeight: sumWeight, weight: spread.Weight, desiredCounts: make(map[string]float64)}
for _, st := range spread.SpreadTarget {
desiredCount := (float64(st.Percent) / float64(sumWeight)) * float64(totalCount)
si.desiredCounts[st.Value] = desiredCount
}
spreadInfos[spread.Attribute] = si
iter.sumSpreadWeights += spread.Weight
}
iter.tgSpreadInfo[tg.Name] = spreadInfos
}

249
scheduler/spread_test.go Normal file
View File

@@ -0,0 +1,249 @@
package scheduler
import (
"testing"
"fmt"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
func TestSpreadIterator_SingleAttribute(t *testing.T) {
state, ctx := testContext(t)
dcs := []string{"dc1", "dc2", "dc1", "dc1"}
var nodes []*RankedNode
// Add these nodes to the state store
for i, dc := range dcs {
node := mock.Node()
node.Datacenter = dc
if err := state.UpsertNode(uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 5
// add allocs to nodes in dc1
upserting := []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[2].Node.ID,
},
}
if err := state.UpsertAllocs(1000, upserting); err != nil {
t.Fatalf("failed to UpsertAllocs: %v", err)
}
spread := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 80,
},
{
Value: "dc2",
Percent: 20,
},
},
}
tg.Spreads = []*structs.Spread{spread}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Expect nodes in dc1 with existing allocs to get a boost
// Boost should be ((desiredCount-actual)/expected)*spreadWeight
// For this test, that becomes dc1 = ((4-2)/4 ) = 0.5, and dc2=(1-0)/1
expectedScores := map[string]float64{
"dc1": 0.5,
"dc2": 1.0,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
// Update the plan to add more allocs to nodes in dc1
// After this step there are enough allocs to meet the desired count in dc1
ctx.plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
// Should be ignored as it is a different job.
{
Namespace: structs.DefaultNamespace,
TaskGroup: "bbb",
JobID: "ignore 2",
Job: job,
ID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
}
ctx.plan.NodeAllocation[nodes[3].Node.ID] = []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
NodeID: nodes[3].Node.ID,
},
}
// Reset the scores
for _, node := range nodes {
node.Scores = nil
node.FinalScore = 0
}
static = NewStaticRankIterator(ctx, nodes)
spreadIter = NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm = NewScoreNormalizationIterator(ctx, spreadIter)
out = collectRanked(scoreNorm)
// Expect nodes in dc2 with existing allocs to get a boost
// DC1 nodes are not boosted because there are enough allocs to meet
// the desired count
expectedScores = map[string]float64{
"dc1": 0,
"dc2": 1.0,
}
for _, rn := range out {
require.Equal(t, expectedScores[rn.Node.Datacenter], rn.FinalScore)
}
}
func TestSpreadIterator_MultipleAttributes(t *testing.T) {
state, ctx := testContext(t)
dcs := []string{"dc1", "dc2", "dc1", "dc1"}
rack := []string{"r1", "r1", "r2", "r2"}
var nodes []*RankedNode
// Add these nodes to the state store
for i, dc := range dcs {
node := mock.Node()
node.Datacenter = dc
node.Meta["rack"] = rack[i]
if err := state.UpsertNode(uint64(100+i), node); err != nil {
t.Fatalf("failed to upsert node: %v", err)
}
nodes = append(nodes, &RankedNode{Node: node})
}
static := NewStaticRankIterator(ctx, nodes)
job := mock.Job()
tg := job.TaskGroups[0]
job.TaskGroups[0].Count = 5
// add allocs to nodes in dc1
upserting := []*structs.Allocation{
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
},
{
Namespace: structs.DefaultNamespace,
TaskGroup: tg.Name,
JobID: job.ID,
Job: job,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[2].Node.ID,
},
}
if err := state.UpsertAllocs(1000, upserting); err != nil {
t.Fatalf("failed to UpsertAllocs: %v", err)
}
spread1 := &structs.Spread{
Weight: 100,
Attribute: "${node.datacenter}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "dc1",
Percent: 60,
},
{
Value: "dc2",
Percent: 40,
},
},
}
spread2 := &structs.Spread{
Weight: 50,
Attribute: "${meta.rack}",
SpreadTarget: []*structs.SpreadTarget{
{
Value: "r1",
Percent: 40,
},
{
Value: "r2",
Percent: 60,
},
},
}
tg.Spreads = []*structs.Spread{spread1, spread2}
spreadIter := NewSpreadIterator(ctx, static)
spreadIter.SetJob(job)
spreadIter.SetTaskGroup(tg)
scoreNorm := NewScoreNormalizationIterator(ctx, spreadIter)
out := collectRanked(scoreNorm)
// Score come from combining two different spread factors
// Second node should have the highest score because it has no allocs and its in dc2/r1
expectedScores := map[string]float64{
nodes[0].Node.ID: 0.389,
nodes[1].Node.ID: 0.833,
nodes[2].Node.ID: 0.444,
nodes[3].Node.ID: 0.444,
}
for _, rn := range out {
require.Equal(t, fmt.Sprintf("%.3f", expectedScores[rn.Node.ID]), fmt.Sprintf("%.3f", rn.FinalScore))
}
}

View File

@@ -57,6 +57,7 @@ type GenericStack struct {
limit *LimitIterator
maxScore *MaxScoreIterator
nodeAffinity *NodeAffinityIterator
spread *SpreadIterator
scoreNorm *ScoreNormalizationIterator
}
@@ -117,7 +118,9 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
s.nodeAffinity = NewNodeAffinityIterator(ctx, s.nodeReschedulingPenalty)
s.scoreNorm = NewScoreNormalizationIterator(ctx, s.nodeAffinity)
s.spread = NewSpreadIterator(ctx, s.nodeAffinity)
s.scoreNorm = NewScoreNormalizationIterator(ctx, s.spread)
// Apply a limit function. This is to avoid scanning *every* possible node.
s.limit = NewLimitIterator(ctx, s.scoreNorm, 2, skipScoreThreshold, maxSkip)
@@ -156,6 +159,7 @@ func (s *GenericStack) SetJob(job *structs.Job) {
s.binPack.SetPriority(job.Priority)
s.jobAntiAff.SetJob(job)
s.nodeAffinity.SetJob(job)
s.spread.SetJob(job)
s.ctx.Eligibility().SetJob(job)
if contextual, ok := s.quota.(ContextualIterator); ok {
@@ -200,7 +204,9 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) (*R
s.nodeReschedulingPenalty.SetPenaltyNodes(options.PenaltyNodeIDs)
}
s.nodeAffinity.SetTaskGroup(tg)
if s.nodeAffinity.hasAffinities() {
s.spread.SetTaskGroup(tg)
if s.nodeAffinity.hasAffinities() || s.spread.hasSpreads() {
s.limit.SetLimit(math.MaxInt32)
}