Change types of weights on spread/affinity

This commit is contained in:
Alex Dadgar
2019-01-30 12:20:38 -08:00
parent 7b98db133b
commit bc42873e07
19 changed files with 102 additions and 74 deletions

View File

@@ -79,13 +79,13 @@ func TestCompose(t *testing.T) {
LTarget: "${node.class}",
RTarget: "large",
Operand: "=",
Weight: 50,
Weight: int8ToPtr(50),
},
},
Spreads: []*Spread{
{
Attribute: "${node.datacenter}",
Weight: intToPtr(30),
Weight: int8ToPtr(30),
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",

View File

@@ -726,6 +726,9 @@ func (j *Job) Canonicalize() {
for _, spread := range j.Spreads {
spread.Canonicalize()
}
for _, a := range j.Affinities {
a.Canonicalize()
}
}
// LookupTaskGroup finds a task group by name

View File

@@ -1352,13 +1352,13 @@ func TestJobs_AddAffinity(t *testing.T) {
LTarget: "kernel.version",
RTarget: "4.6",
Operand: "=",
Weight: 100,
Weight: int8ToPtr(100),
},
{
LTarget: "${node.datacenter}",
RTarget: "dc2",
Operand: "=",
Weight: 50,
Weight: int8ToPtr(50),
},
}
if !reflect.DeepEqual(job.Affinities, expect) {
@@ -1412,7 +1412,7 @@ func TestJobs_AddSpread(t *testing.T) {
expect := []*Spread{
{
Attribute: "${meta.rack}",
Weight: intToPtr(100),
Weight: int8ToPtr(100),
SpreadTarget: []*SpreadTarget{
{
Value: "r1",
@@ -1422,7 +1422,7 @@ func TestJobs_AddSpread(t *testing.T) {
},
{
Attribute: "${node.datacenter}",
Weight: intToPtr(100),
Weight: int8ToPtr(100),
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",

View File

@@ -223,4 +223,8 @@ func (d *RequestedDevice) Canonicalize() {
if d.Count == nil {
d.Count = uint64ToPtr(1)
}
for _, a := range d.Affinities {
a.Canonicalize()
}
}

View File

@@ -159,18 +159,24 @@ func (r *ReschedulePolicy) Canonicalize(jobType string) {
// Affinity is used to serialize task group affinities
type Affinity struct {
LTarget string // Left-hand target
RTarget string // Right-hand target
Operand string // Constraint operand (<=, <, =, !=, >, >=), set_contains_all, set_contains_any
Weight float64 // Weight applied to nodes that match the affinity. Can be negative
LTarget string // Left-hand target
RTarget string // Right-hand target
Operand string // Constraint operand (<=, <, =, !=, >, >=), set_contains_all, set_contains_any
Weight *int8 // Weight applied to nodes that match the affinity. Can be negative
}
func NewAffinity(LTarget string, Operand string, RTarget string, Weight float64) *Affinity {
func NewAffinity(LTarget string, Operand string, RTarget string, Weight int8) *Affinity {
return &Affinity{
LTarget: LTarget,
RTarget: RTarget,
Operand: Operand,
Weight: Weight,
Weight: int8ToPtr(Weight),
}
}
func (a *Affinity) Canonicalize() {
if a.Weight == nil {
a.Weight = int8ToPtr(50)
}
}
@@ -237,34 +243,34 @@ func (p *ReschedulePolicy) String() string {
// Spread is used to serialize task group allocation spread preferences
type Spread struct {
Attribute string
Weight *int
Weight *int8
SpreadTarget []*SpreadTarget
}
// SpreadTarget is used to serialize target allocation spread percentages
type SpreadTarget struct {
Value string
Percent uint32
Percent uint8
}
func NewSpreadTarget(value string, percent uint32) *SpreadTarget {
func NewSpreadTarget(value string, percent uint8) *SpreadTarget {
return &SpreadTarget{
Value: value,
Percent: percent,
}
}
func NewSpread(attribute string, weight int, spreadTargets []*SpreadTarget) *Spread {
func NewSpread(attribute string, weight int8, spreadTargets []*SpreadTarget) *Spread {
return &Spread{
Attribute: attribute,
Weight: intToPtr(weight),
Weight: int8ToPtr(weight),
SpreadTarget: spreadTargets,
}
}
func (s *Spread) Canonicalize() {
if s.Weight == nil {
s.Weight = intToPtr(50)
s.Weight = int8ToPtr(50)
}
}
@@ -596,7 +602,9 @@ func (g *TaskGroup) Canonicalize(job *Job) {
for _, spread := range g.Spreads {
spread.Canonicalize()
}
for _, a := range g.Affinities {
a.Canonicalize()
}
}
// Constrain is used to add a constraint to a task group.
@@ -713,6 +721,9 @@ func (t *Task) Canonicalize(tg *TaskGroup, job *Job) {
for _, s := range t.Services {
s.Canonicalize(t, tg, job)
}
for _, a := range t.Affinities {
a.Canonicalize()
}
}
// TaskArtifact is used to download artifacts before running a task.

View File

@@ -79,13 +79,13 @@ func TestTaskGroup_AddAffinity(t *testing.T) {
LTarget: "kernel.version",
RTarget: "4.6",
Operand: "=",
Weight: 100,
Weight: int8ToPtr(100),
},
{
LTarget: "${node.affinity}",
RTarget: "dc2",
Operand: "=",
Weight: 50,
Weight: int8ToPtr(50),
},
}
if !reflect.DeepEqual(grp.Affinities, expect) {
@@ -143,7 +143,7 @@ func TestTaskGroup_AddSpread(t *testing.T) {
expect := []*Spread{
{
Attribute: "${meta.rack}",
Weight: intToPtr(100),
Weight: int8ToPtr(100),
SpreadTarget: []*SpreadTarget{
{
Value: "r1",
@@ -153,7 +153,7 @@ func TestTaskGroup_AddSpread(t *testing.T) {
},
{
Attribute: "${node.datacenter}",
Weight: intToPtr(100),
Weight: int8ToPtr(100),
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
@@ -340,13 +340,13 @@ func TestTask_AddAffinity(t *testing.T) {
LTarget: "kernel.version",
RTarget: "4.6",
Operand: "=",
Weight: 100,
Weight: int8ToPtr(100),
},
{
LTarget: "${node.datacenter}",
RTarget: "dc2",
Operand: "=",
Weight: 50,
Weight: int8ToPtr(50),
},
}
if !reflect.DeepEqual(task.Affinities, expect) {
@@ -766,7 +766,7 @@ func TestSpread_Canonicalize(t *testing.T) {
type testCase struct {
desc string
spread *Spread
expectedWeight int
expectedWeight int8
}
cases := []testCase{
{
@@ -781,7 +781,7 @@ func TestSpread_Canonicalize(t *testing.T) {
"Zero spread",
&Spread{
Attribute: "test",
Weight: intToPtr(0),
Weight: int8ToPtr(0),
},
0,
},
@@ -789,7 +789,7 @@ func TestSpread_Canonicalize(t *testing.T) {
"Non Zero spread",
&Spread{
Attribute: "test",
Weight: intToPtr(100),
Weight: int8ToPtr(100),
},
100,
},

View File

@@ -11,6 +11,11 @@ func boolToPtr(b bool) *bool {
return &b
}
// int8ToPtr returns the pointer to an int
func int8ToPtr(i int8) *int8 {
return &i
}
// intToPtr returns the pointer to an int
func intToPtr(i int) *int {
return &i

View File

@@ -949,7 +949,7 @@ func ApiAffinityToStructs(a1 *api.Affinity) *structs.Affinity {
LTarget: a1.LTarget,
Operand: a1.Operand,
RTarget: a1.RTarget,
Weight: a1.Weight,
Weight: *a1.Weight,
}
}

View File

@@ -65,6 +65,11 @@ func IntToPtr(i int) *int {
return &i
}
// Int8ToPtr returns the pointer to an int
func Int8ToPtr(i int8) *int8 {
return &i
}
// Int64ToPtr returns the pointer to an int
func Int64ToPtr(i int64) *int64 {
return &i

View File

@@ -50,14 +50,14 @@ func TestParse(t *testing.T) {
LTarget: "${meta.team}",
RTarget: "mobile",
Operand: "=",
Weight: 50,
Weight: helper.Int8ToPtr(50),
},
},
Spreads: []*api.Spread{
{
Attribute: "${meta.rack}",
Weight: helper.IntToPtr(100),
Weight: helper.Int8ToPtr(100),
SpreadTarget: []*api.SpreadTarget{
{
Value: "r1",
@@ -114,7 +114,7 @@ func TestParse(t *testing.T) {
LTarget: "${node.datacenter}",
RTarget: "dc2",
Operand: "=",
Weight: 100,
Weight: helper.Int8ToPtr(100),
},
},
Meta: map[string]string{
@@ -131,7 +131,7 @@ func TestParse(t *testing.T) {
Spreads: []*api.Spread{
{
Attribute: "${node.datacenter}",
Weight: helper.IntToPtr(50),
Weight: helper.Int8ToPtr(50),
SpreadTarget: []*api.SpreadTarget{
{
Value: "dc1",
@@ -189,7 +189,7 @@ func TestParse(t *testing.T) {
LTarget: "${meta.foo}",
RTarget: "a,b,c",
Operand: "set_contains",
Weight: 25,
Weight: helper.Int8ToPtr(25),
},
},
Services: []*api.Service{
@@ -245,7 +245,7 @@ func TestParse(t *testing.T) {
LTarget: "${device.model}",
RTarget: "1080ti",
Operand: "=",
Weight: 50,
Weight: helper.Int8ToPtr(50),
},
},
},

View File

@@ -6455,11 +6455,11 @@ func (c *Constraint) Validate() error {
// Affinity is used to score placement options based on a weight
type Affinity struct {
LTarget string // Left-hand target
RTarget string // Right-hand target
Operand string // Affinity operand (<=, <, =, !=, >, >=), set_contains_all, set_contains_any
Weight float64 // Weight applied to nodes that match the affinity. Can be negative
str string // Memoized string
LTarget string // Left-hand target
RTarget string // Right-hand target
Operand string // Affinity operand (<=, <, =, !=, >, >=), set_contains_all, set_contains_any
Weight int8 // Weight applied to nodes that match the affinity. Can be negative
str string // Memoized string
}
// Equal checks if two affinities are equal
@@ -6539,7 +6539,7 @@ type Spread struct {
// Weight is the relative weight of this spread, useful when there are multiple
// spread and affinities
Weight int
Weight int8
// SpreadTarget is used to describe desired percentages for each attribute value
SpreadTarget []*SpreadTarget
@@ -6589,7 +6589,7 @@ func (s *Spread) Validate() error {
if target.Percent < 0 || target.Percent > 100 {
mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Spread target percentage for value %q must be between 0 and 100", target.Value)))
}
sumPercent += target.Percent
sumPercent += uint32(target.Percent)
}
if sumPercent > 100 {
mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Sum of spread target percentages must not be greater than 100%%; got %d%%", sumPercent)))
@@ -6603,7 +6603,7 @@ type SpreadTarget struct {
Value string
// Percent is the desired percentage of allocs
Percent uint32
Percent uint8
// Memoized string representation
str string

View File

@@ -1642,7 +1642,7 @@ func TestAffinity_Validate(t *testing.T) {
Operand: "=",
LTarget: "${meta.node_class}",
RTarget: "c4",
Weight: 500,
Weight: 110,
},
err: fmt.Errorf("Affinity weight must be within the range [-100,100]"),
},
@@ -1659,7 +1659,7 @@ func TestAffinity_Validate(t *testing.T) {
Operand: "version",
LTarget: "${meta.os}",
RTarget: ">>2.0",
Weight: 500,
Weight: 110,
},
err: fmt.Errorf("Version affinity is invalid"),
},
@@ -4078,7 +4078,7 @@ func TestSpread_Validate(t *testing.T) {
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 200,
Weight: 110,
},
err: fmt.Errorf("Spread stanza must have a positive weight from 0 to 100"),
name: "Invalid weight",

View File

@@ -77,14 +77,14 @@ func (d *deviceAllocator) AssignDevice(ask *structs.RequestedDevice) (out *struc
lVal, lOk := resolveDeviceTarget(a.LTarget, devInst.Device)
rVal, rOk := resolveDeviceTarget(a.RTarget, devInst.Device)
totalWeight += math.Abs(a.Weight)
totalWeight += math.Abs(float64(a.Weight))
// Check if satisfied
if !checkAttributeAffinity(d.ctx, a.Operand, lVal, rVal, lOk, rOk) {
continue
}
choiceScore += a.Weight
sumMatchedWeights += a.Weight
choiceScore += float64(a.Weight)
sumMatchedWeights += float64(a.Weight)
}
// normalize

View File

@@ -274,7 +274,7 @@ func TestDeviceAllocator_Allocate_Affinities(t *testing.T) {
LTarget: "${device.attr.cuda_cores}",
Operand: ">",
RTarget: "4000",
Weight: 0.6,
Weight: 60,
},
},
ExpectedDevice: nvidia1,
@@ -286,7 +286,7 @@ func TestDeviceAllocator_Allocate_Affinities(t *testing.T) {
LTarget: "${device.attr.cuda_cores}",
Operand: "<",
RTarget: "4000",
Weight: 0.1,
Weight: 10,
},
},
ExpectedDevice: nvidia0,
@@ -298,7 +298,7 @@ func TestDeviceAllocator_Allocate_Affinities(t *testing.T) {
LTarget: "${device.attr.cuda_cores}",
Operand: ">",
RTarget: "4000",
Weight: -0.2,
Weight: -20,
},
},
ZeroScore: true,
@@ -312,19 +312,19 @@ func TestDeviceAllocator_Allocate_Affinities(t *testing.T) {
LTarget: "${device.attr.memory_bandwidth}",
Operand: ">",
RTarget: "10 GB/s",
Weight: 0.2,
Weight: 20,
},
{
LTarget: "${device.attr.memory}",
Operand: "is",
RTarget: "11264 MiB",
Weight: 0.2,
Weight: 20,
},
{
LTarget: "${device.attr.graphics_clock}",
Operand: ">",
RTarget: "1.4 GHz",
Weight: 0.9,
Weight: 90,
},
},
ExpectedDevice: nvidia0,

View File

@@ -615,14 +615,14 @@ func TestServiceSched_JobRegister_DistinctProperty_TaskGroup_Incr(t *testing.T)
func TestServiceSched_Spread(t *testing.T) {
assert := assert.New(t)
start := uint32(100)
step := uint32(10)
start := uint8(100)
step := uint8(10)
for i := 0; i < 10; i++ {
name := fmt.Sprintf("%d%% in dc1", start)
t.Run(name, func(t *testing.T) {
h := NewHarness(t)
remaining := uint32(100 - start)
remaining := uint8(100 - start)
// Create a job that uses spread over data center
job := mock.Job()
job.Datacenters = []string{"dc1", "dc2"}

View File

@@ -324,7 +324,7 @@ OUTER:
// Add the scores
if len(req.Affinities) != 0 {
for _, a := range req.Affinities {
totalDeviceAffinityWeight += math.Abs(a.Weight)
totalDeviceAffinityWeight += math.Abs(float64(a.Weight))
}
sumMatchingAffinities += sumAffinities
}
@@ -572,13 +572,13 @@ func (iter *NodeAffinityIterator) Next() *RankedNode {
// TODO(preetha): we should calculate normalized weights once and reuse it here
sumWeight := 0.0
for _, affinity := range iter.affinities {
sumWeight += math.Abs(affinity.Weight)
sumWeight += math.Abs(float64(affinity.Weight))
}
totalAffinityScore := 0.0
for _, affinity := range iter.affinities {
if matchesAffinity(iter.ctx, affinity, option.Node) {
totalAffinityScore += affinity.Weight
totalAffinityScore += float64(affinity.Weight)
}
}
normScore := totalAffinityScore / sumWeight

View File

@@ -587,7 +587,7 @@ func TestBinPackIterator_Devices(t *testing.T) {
LTarget: "${device.attr.graphics_clock}",
Operand: ">",
RTarget: "1.4 GHz",
Weight: 0.9,
Weight: 90,
},
},
},
@@ -990,7 +990,7 @@ func TestNodeAffinityIterator(t *testing.T) {
Operand: "=",
LTarget: "${node.datacenter}",
RTarget: "dc1",
Weight: 200,
Weight: 100,
},
{
Operand: "=",
@@ -1026,18 +1026,18 @@ func TestNodeAffinityIterator(t *testing.T) {
out := collectRanked(scoreNorm)
expectedScores := make(map[string]float64)
// Total weight = 400
// Node 0 matches two affinities(dc and kernel version), total weight =250
expectedScores[nodes[0].Node.ID] = 0.625
// Total weight = 300
// Node 0 matches two affinities(dc and kernel version), total weight = 150
expectedScores[nodes[0].Node.ID] = 0.5
// Node 1 matches an anti affinity, weight = -100
expectedScores[nodes[1].Node.ID] = -0.25
expectedScores[nodes[1].Node.ID] = -(1.0 / 3.0)
// Node 2 matches one affinity(node class) with weight 50
expectedScores[nodes[2].Node.ID] = -0.125
expectedScores[nodes[2].Node.ID] = -(1.0 / 6.0)
// Node 3 matches one affinity (dc) with weight = 200
expectedScores[nodes[3].Node.ID] = 0.5
// Node 3 matches one affinity (dc) with weight = 100
expectedScores[nodes[3].Node.ID] = 1.0 / 3.0
require := require.New(t)
for _, n := range out {

View File

@@ -28,7 +28,7 @@ type SpreadIterator struct {
// sumSpreadWeights tracks the total weight across all spread
// stanzas
sumSpreadWeights int
sumSpreadWeights int32
// hasSpread is used to early return when the job/task group
// does not have spread configured
@@ -43,7 +43,7 @@ type SpreadIterator struct {
type spreadAttributeMap map[string]*spreadInfo
type spreadInfo struct {
weight int
weight int8
desiredCounts map[string]float64
}
@@ -246,7 +246,7 @@ func (iter *SpreadIterator) computeSpreadInfo(tg *structs.TaskGroup) {
si.desiredCounts[implicitTarget] = remainingCount
}
spreadInfos[spread.Attribute] = si
iter.sumSpreadWeights += spread.Weight
iter.sumSpreadWeights += int32(spread.Weight)
}
iter.tgSpreadInfo[tg.Name] = spreadInfos
}

View File

@@ -98,7 +98,7 @@ restricted only to nodes that meet the constraint's criteria.
or any [Nomad interpolated
values](/docs/runtime/interpolation.html#interpreted_node_vars).
- `weight` `(integer:0)` - Specifies a weight for the affinity. The weight is used
- `weight` `(integer: 50)` - Specifies a weight for the affinity. The weight is used
during scoring and must be an integer between -100 to 100. Negative weights act as
anti affinities, causing nodes that match them to be scored lower. Weights can be used
when there is more than one affinity to express relative preference across them.