Allow empty spread targets, and validate target percentages.

This commit is contained in:
Preetha Appan
2018-07-25 19:08:25 -05:00
parent ffab506f84
commit fa7e3a3f35
3 changed files with 116 additions and 48 deletions

View File

@@ -2189,10 +2189,16 @@ func (j *Job) Validate() error {
}
}
for idx, spread := range j.Spreads {
if err := spread.Validate(); err != nil {
outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
if j.Type == JobTypeSystem {
if j.Spreads != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a s stanza"))
}
} else {
for idx, spread := range j.Spreads {
if err := spread.Validate(); err != nil {
outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
}
@@ -3466,10 +3472,16 @@ func (tg *TaskGroup) Validate(j *Job) error {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Task Group %v should have a restart policy", tg.Name))
}
for idx, spread := range tg.Spreads {
if err := spread.Validate(); err != nil {
outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
if j.Type == JobTypeSystem {
if tg.Spreads != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("System jobs may not have a spread stanza"))
}
} else {
for idx, spread := range tg.Spreads {
if err := spread.Validate(); err != nil {
outer := fmt.Errorf("Spread %d validation failed: %s", idx+1, err)
mErr.Errors = append(mErr.Errors, outer)
}
}
}
@@ -5407,19 +5419,18 @@ func (a *Affinity) Validate() error {
return mErr.ErrorOrNil()
}
// Spread is used to specify desired distribution of allocations according to weight
type Spread struct {
Attribute string
Weight int
// Attribute is the node attribute used as the spread criteria
Attribute string
// Weight is the relative weight of this spread, useful when there are multiple
// spread and affinities
Weight int
// SpreadTarget is used to describe desired percentages for each attribute value
SpreadTarget []*SpreadTarget
str string
}
type SpreadTarget struct {
Value string
Percent uint32
str string
}
func (s *Spread) Copy() *Spread {
if s == nil {
return nil
@@ -5431,16 +5442,6 @@ func (s *Spread) Copy() *Spread {
return ns
}
func (s *SpreadTarget) Copy() *SpreadTarget {
if s == nil {
return nil
}
ns := new(SpreadTarget)
*ns = *s
return ns
}
func (s *Spread) String() string {
if s.str != "" {
return s.str
@@ -5449,14 +5450,6 @@ func (s *Spread) String() string {
return s.str
}
func (s *SpreadTarget) String() string {
if s.str != "" {
return s.str
}
s.str = fmt.Sprintf("%s %v", s.Value, s.Percent)
return s.str
}
func (s *Spread) Validate() error {
var mErr multierror.Error
if s.Attribute == "" {
@@ -5465,12 +5458,9 @@ func (s *Spread) Validate() error {
if s.Weight <= 0 || s.Weight > 100 {
mErr.Errors = append(mErr.Errors, errors.New("Spread stanza must have a positive weight from 0 to 100"))
}
if len(s.SpreadTarget) == 0 {
// TODO(preetha): This should go away if we can assume even spread if there are no targets
// In that case, the target percentages should be calculated at schedule time
mErr.Errors = append(mErr.Errors, errors.New("Atleast one spread target value must be specified"))
}
seen := make(map[string]struct{})
sumPercent := uint32(0)
for _, target := range s.SpreadTarget {
// Make sure there are no duplicates
_, ok := seen[target.Value]
@@ -5479,10 +5469,45 @@ func (s *Spread) Validate() error {
} else {
mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Spread target value %q already defined", target.Value)))
}
if target.Percent < 0 || target.Percent > 100 {
mErr.Errors = append(mErr.Errors, errors.New(fmt.Sprintf("Spread target percentage for value %q must be between 0 and 100", target.Value)))
}
sumPercent += target.Percent
}
if sumPercent > 100 {
mErr.Errors = append(mErr.Errors, errors.New("Sum of spread target percentages must not be greater than 100"))
}
return mErr.ErrorOrNil()
}
// SpreadTarget is used to specify desired percentages
// for each attribute value
type SpreadTarget struct {
// Value is a single attribute value, like "dc1"
Value string
// Percent is the desired percentage of allocs
Percent uint32
str string
}
func (s *SpreadTarget) Copy() *SpreadTarget {
if s == nil {
return nil
}
ns := new(SpreadTarget)
*ns = *s
return ns
}
func (s *SpreadTarget) String() string {
if s.str != "" {
return s.str
}
s.str = fmt.Sprintf("%q %v%%", s.Value, s.Percent)
return s.str
}
// EphemeralDisk is an ephemeral disk object
type EphemeralDisk struct {
// Sticky indicates whether the allocation is sticky to a node

View File

@@ -405,6 +405,21 @@ func TestJob_SystemJob_Validate(t *testing.T) {
err = j.Validate()
require.NotNil(t, err)
require.Contains(t, err.Error(), "System jobs may not have an affinity stanza")
// Add spread at job and task group level, that should fail validation
j.Spreads = []*Spread{{
Attribute: "${node.datacenter}",
Weight: 100,
}}
j.TaskGroups[0].Spreads = []*Spread{{
Attribute: "${node.datacenter}",
Weight: 100,
}}
err = j.Validate()
require.NotNil(t, err)
require.Contains(t, err.Error(), "System jobs may not have a spread stanza")
}
func TestJob_VaultPolicies(t *testing.T) {
@@ -3960,9 +3975,37 @@ func TestSpread_Validate(t *testing.T) {
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 50,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 25,
},
{
Value: "dc2",
Percent: 150,
},
},
},
err: fmt.Errorf("Atleast one spread target value must be specified"),
name: "No spread targets",
err: fmt.Errorf("Spread target percentage for value \"dc2\" must be between 0 and 100"),
name: "Invalid percentages",
},
{
spread: &Spread{
Attribute: "${node.datacenter}",
Weight: 50,
SpreadTarget: []*SpreadTarget{
{
Value: "dc1",
Percent: 75,
},
{
Value: "dc2",
Percent: 75,
},
},
},
err: fmt.Errorf("Sum of spread target percentages must not be greater than 100"),
name: "Invalid percentages",
},
{
spread: &Spread{

View File

@@ -60,7 +60,7 @@ func NewPropertySet(ctx Context, job *structs.Job) *propertySet {
return p
}
// SetJobConstraintAttribute is used to parameterize the property set for a
// SetJobConstraint is used to parameterize the property set for a
// distinct_property constraint set at the job level.
func (p *propertySet) SetJobConstraint(constraint *structs.Constraint) {
p.setConstraint(constraint, "")
@@ -89,18 +89,18 @@ func (p *propertySet) setConstraint(constraint *structs.Constraint, taskGroup st
} else {
allowedCount = 1
}
p.setPropertySetInner(constraint.LTarget, allowedCount, taskGroup)
p.setTargetAttributeWithCount(constraint.LTarget, allowedCount, taskGroup)
}
// SetTargetAttribute is used to populate this property set without also storing allowed count
// This is used when evaluating spread stanzas
func (p *propertySet) SetTargetAttribute(targetAttribute string, taskGroup string) {
p.setPropertySetInner(targetAttribute, 0, taskGroup)
p.setTargetAttributeWithCount(targetAttribute, 0, taskGroup)
}
// setConstraint is a shared helper for setting a job or task group attribute and allowedCount
// setTargetAttributeWithCount is a shared helper for setting a job or task group attribute and allowedCount
// allowedCount can be zero when this is used in evaluating spread stanzas
func (p *propertySet) setPropertySetInner(targetAttribute string, allowedCount uint64, taskGroup string) {
func (p *propertySet) setTargetAttributeWithCount(targetAttribute string, allowedCount uint64, taskGroup string) {
// Store that this is for a task group
if taskGroup != "" {
p.taskGroup = taskGroup
@@ -113,7 +113,7 @@ func (p *propertySet) setPropertySetInner(targetAttribute string, allowedCount u
// Determine the number of existing allocations that are using a property
// value
p.populateExisting(targetAttribute)
p.populateExisting()
// Populate the proposed when setting the constraint. We do this because
// when detecting if we can inplace update an allocation we stage an
@@ -124,7 +124,7 @@ func (p *propertySet) setPropertySetInner(targetAttribute string, allowedCount u
// populateExisting is a helper shared when setting the constraint to populate
// the existing values.
func (p *propertySet) populateExisting(targetAttribute string) {
func (p *propertySet) populateExisting() {
// Retrieve all previously placed allocations
ws := memdb.NewWatchSet()
allocs, err := p.ctx.State().AllocsByJob(ws, p.namespace, p.jobID, false)