Merge pull request #321 from hashicorp/f-unique-constraint

Add "distinctHost" constraint
This commit is contained in:
Alex Dadgar
2015-10-26 14:18:57 -07:00
10 changed files with 370 additions and 25 deletions

View File

@@ -7,6 +7,7 @@ import (
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"time"
@@ -244,18 +245,32 @@ func parseConstraints(result *[]*structs.Constraint, obj *hclobj.Object) error {
// If "version" is provided, set the operand
// to "version" and the value to the "RTarget"
if constraint, ok := m["version"]; ok {
m["Operand"] = "version"
if constraint, ok := m[structs.ConstraintVersion]; ok {
m["Operand"] = structs.ConstraintVersion
m["RTarget"] = constraint
}
// If "regexp" is provided, set the operand
// to "regexp" and the value to the "RTarget"
if constraint, ok := m["regexp"]; ok {
m["Operand"] = "regexp"
if constraint, ok := m[structs.ConstraintRegex]; ok {
m["Operand"] = structs.ConstraintRegex
m["RTarget"] = constraint
}
if value, ok := m[structs.ConstraintDistinctHosts]; ok {
enabled, err := strconv.ParseBool(value.(string))
if err != nil {
return err
}
// If it is not enabled, skip the constraint.
if !enabled {
continue
}
m["Operand"] = structs.ConstraintDistinctHosts
}
// Build the constraint
var c structs.Constraint
if err := mapstructure.WeakDecode(m, &c); err != nil {

View File

@@ -165,7 +165,7 @@ func TestParse(t *testing.T) {
Hard: true,
LTarget: "$attr.kernel.version",
RTarget: "~> 3.2",
Operand: "version",
Operand: structs.ConstraintVersion,
},
},
},
@@ -185,7 +185,25 @@ func TestParse(t *testing.T) {
Hard: true,
LTarget: "$attr.kernel.version",
RTarget: "[0-9.]+",
Operand: "regexp",
Operand: structs.ConstraintRegex,
},
},
},
false,
},
{
"distinctHosts-constraint.hcl",
&structs.Job{
ID: "foo",
Name: "foo",
Priority: 50,
Region: "global",
Type: "service",
Constraints: []*structs.Constraint{
&structs.Constraint{
Hard: true,
Operand: structs.ConstraintDistinctHosts,
},
},
},

View File

@@ -0,0 +1,5 @@
job "foo" {
constraint {
distinct_hosts = "true"
}
}

View File

@@ -1027,6 +1027,12 @@ func (t *Task) Validate() error {
return mErr.ErrorOrNil()
}
const (
ConstraintDistinctHosts = "distinct_hosts"
ConstraintRegex = "regexp"
ConstraintVersion = "version"
)
// Constraints are used to restrict placement options in the case of
// a hard constraint, and used to prefer a placement in the case of
// a soft constraint.
@@ -1050,11 +1056,11 @@ func (c *Constraint) Validate() error {
// Perform additional validation based on operand
switch c.Operand {
case "regexp":
case ConstraintRegex:
if _, err := regexp.Compile(c.RTarget); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Regular expression failed to compile: %v", err))
}
case "version":
case ConstraintVersion:
if _, err := version.NewConstraint(c.RTarget); err != nil {
mErr.Errors = append(mErr.Errors, fmt.Errorf("Version constraint is invalid: %v", err))
}

View File

@@ -144,7 +144,7 @@ func TestConstraint_Validate(t *testing.T) {
}
// Perform additional regexp validation
c.Operand = "regexp"
c.Operand = ConstraintRegex
c.RTarget = "(foo"
err = c.Validate()
mErr = err.(*multierror.Error)
@@ -153,7 +153,7 @@ func TestConstraint_Validate(t *testing.T) {
}
// Perform version validation
c.Operand = "version"
c.Operand = ConstraintVersion
c.RTarget = "~> foo"
err = c.Validate()
mErr = err.(*multierror.Error)

View File

@@ -150,6 +150,106 @@ func (iter *DriverIterator) hasDrivers(option *structs.Node) bool {
return true
}
// ProposedAllocConstraintIterator is a FeasibleIterator which returns nodes that
// match constraints that are not static such as Node attributes but are
// effected by proposed alloc placements. Examples are distinct_hosts and
// tenancy constraints. This is used to filter on job and task group
// constraints.
type ProposedAllocConstraintIterator struct {
ctx Context
source FeasibleIterator
tg *structs.TaskGroup
job *structs.Job
// Store whether the Job or TaskGroup has a distinct_hosts constraints so
// they don't have to be calculated every time Next() is called.
tgDistinctHosts bool
jobDistinctHosts bool
}
// NewProposedAllocConstraintIterator creates a ProposedAllocConstraintIterator
// from a source.
func NewProposedAllocConstraintIterator(ctx Context, source FeasibleIterator) *ProposedAllocConstraintIterator {
iter := &ProposedAllocConstraintIterator{
ctx: ctx,
source: source,
}
return iter
}
func (iter *ProposedAllocConstraintIterator) SetTaskGroup(tg *structs.TaskGroup) {
iter.tg = tg
iter.tgDistinctHosts = iter.hasDistinctHostsConstraint(tg.Constraints)
}
func (iter *ProposedAllocConstraintIterator) SetJob(job *structs.Job) {
iter.job = job
iter.jobDistinctHosts = iter.hasDistinctHostsConstraint(job.Constraints)
}
func (iter *ProposedAllocConstraintIterator) hasDistinctHostsConstraint(constraints []*structs.Constraint) bool {
for _, con := range constraints {
if con.Operand == structs.ConstraintDistinctHosts {
return true
}
}
return false
}
func (iter *ProposedAllocConstraintIterator) Next() *structs.Node {
for {
// Get the next option from the source
option := iter.source.Next()
// Hot-path if the option is nil or there are no distinct_hosts constraints.
if option == nil || !(iter.jobDistinctHosts || iter.tgDistinctHosts) {
return option
}
if !iter.satisfiesDistinctHosts(option) {
iter.ctx.Metrics().FilterNode(option, structs.ConstraintDistinctHosts)
continue
}
return option
}
}
// satisfiesDistinctHosts checks if the node satisfies a distinct_hosts
// constraint either specified at the job level or the TaskGroup level.
func (iter *ProposedAllocConstraintIterator) satisfiesDistinctHosts(option *structs.Node) bool {
// Check if there is no constraint set.
if !(iter.jobDistinctHosts || iter.tgDistinctHosts) {
return true
}
// Get the proposed allocations
proposed, err := iter.ctx.ProposedAllocs(option.ID)
if err != nil {
iter.ctx.Logger().Printf(
"[ERR] scheduler.dynamic-constraint: failed to get proposed allocations: %v", err)
return false
}
// Skip the node if the task group has already been allocated on it.
for _, alloc := range proposed {
// If the job has a distinct_hosts constraint we only need an alloc
// collision on the JobID but if the constraint is on the TaskGroup then
// we need both a job and TaskGroup collision.
jobCollision := alloc.JobID == iter.job.ID
taskCollision := alloc.TaskGroup == iter.tg.Name
if iter.jobDistinctHosts && jobCollision || jobCollision && taskCollision {
return false
}
}
return true
}
func (iter *ProposedAllocConstraintIterator) Reset() {
iter.source.Reset()
}
// ConstraintIterator is a FeasibleIterator which returns nodes
// that match a given set of constraints. This is used to filter
// on job, task group, and task constraints.
@@ -257,6 +357,14 @@ func resolveConstraintTarget(target string, node *structs.Node) (interface{}, bo
// checkConstraint checks if a constraint is satisfied
func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool {
// Check for constraints not handled by this iterator.
switch operand {
case structs.ConstraintDistinctHosts:
return true
default:
break
}
switch operand {
case "=", "==", "is":
return reflect.DeepEqual(lVal, rVal)
@@ -264,9 +372,9 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool {
return !reflect.DeepEqual(lVal, rVal)
case "<", "<=", ">", ">=":
return checkLexicalOrder(operand, lVal, rVal)
case "version":
case structs.ConstraintVersion:
return checkVersionConstraint(ctx, lVal, rVal)
case "regexp":
case structs.ConstraintRegex:
return checkRegexpConstraint(ctx, lVal, rVal)
default:
return false

View File

@@ -248,12 +248,12 @@ func TestCheckConstraint(t *testing.T) {
result: true,
},
{
op: "version",
op: structs.ConstraintVersion,
lVal: "1.2.3", rVal: "~> 1.0",
result: true,
},
{
op: "regexp",
op: structs.ConstraintRegex,
lVal: "foobarbaz", rVal: "[\\w]+",
result: true,
},
@@ -382,6 +382,180 @@ func TestCheckRegexpConstraint(t *testing.T) {
}
}
func TestProposedAllocConstraint_JobDistinctHosts(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
mock.Node(),
mock.Node(),
}
static := NewStaticIterator(ctx, nodes)
// Create a job with a distinct_hosts constraint and two task groups.
tg1 := &structs.TaskGroup{Name: "bar"}
tg2 := &structs.TaskGroup{Name: "baz"}
job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: structs.ConstraintDistinctHosts}},
TaskGroups: []*structs.TaskGroup{tg1, tg2},
}
propsed := NewProposedAllocConstraintIterator(ctx, static)
propsed.SetTaskGroup(tg1)
propsed.SetJob(job)
out := collectFeasible(propsed)
if len(out) != 4 {
t.Fatalf("Bad: %#v", out)
}
selected := make(map[string]struct{}, 4)
for _, option := range out {
if _, ok := selected[option.ID]; ok {
t.Fatalf("selected node %v for more than one alloc", option)
}
selected[option.ID] = struct{}{}
}
}
func TestProposedAllocConstraint_JobDistinctHosts_Infeasible(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}
static := NewStaticIterator(ctx, nodes)
// Create a job with a distinct_hosts constraint and two task groups.
tg1 := &structs.TaskGroup{Name: "bar"}
tg2 := &structs.TaskGroup{Name: "baz"}
job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: structs.ConstraintDistinctHosts}},
TaskGroups: []*structs.TaskGroup{tg1, tg2},
}
// Add allocs placing tg1 on node1 and tg2 on node2. This should make the
// job unsatisfiable.
plan := ctx.Plan()
plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: tg1.Name,
JobID: job.ID,
},
// Should be ignored as it is a different job.
&structs.Allocation{
TaskGroup: tg2.Name,
JobID: "ignore 2",
},
}
plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: tg2.Name,
JobID: job.ID,
},
// Should be ignored as it is a different job.
&structs.Allocation{
TaskGroup: tg1.Name,
JobID: "ignore 2",
},
}
propsed := NewProposedAllocConstraintIterator(ctx, static)
propsed.SetTaskGroup(tg1)
propsed.SetJob(job)
out := collectFeasible(propsed)
if len(out) != 0 {
t.Fatalf("Bad: %#v", out)
}
}
func TestProposedAllocConstraint_JobDistinctHosts_InfeasibleCount(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}
static := NewStaticIterator(ctx, nodes)
// Create a job with a distinct_hosts constraint and three task groups.
tg1 := &structs.TaskGroup{Name: "bar"}
tg2 := &structs.TaskGroup{Name: "baz"}
tg3 := &structs.TaskGroup{Name: "bam"}
job := &structs.Job{
ID: "foo",
Constraints: []*structs.Constraint{{Operand: structs.ConstraintDistinctHosts}},
TaskGroups: []*structs.TaskGroup{tg1, tg2, tg3},
}
propsed := NewProposedAllocConstraintIterator(ctx, static)
propsed.SetTaskGroup(tg1)
propsed.SetJob(job)
// It should not be able to place 3 tasks with only two nodes.
out := collectFeasible(propsed)
if len(out) != 2 {
t.Fatalf("Bad: %#v", out)
}
}
func TestProposedAllocConstraint_TaskGroupDistinctHosts(t *testing.T) {
_, ctx := testContext(t)
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
}
static := NewStaticIterator(ctx, nodes)
// Create a task group with a distinct_hosts constraint.
taskGroup := &structs.TaskGroup{
Name: "example",
Constraints: []*structs.Constraint{
{Operand: structs.ConstraintDistinctHosts},
},
}
// Add a planned alloc to node1.
plan := ctx.Plan()
plan.NodeAllocation[nodes[0].ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: taskGroup.Name,
JobID: "foo",
},
}
// Add a planned alloc to node2 with the same task group name but a
// different job.
plan.NodeAllocation[nodes[1].ID] = []*structs.Allocation{
&structs.Allocation{
TaskGroup: taskGroup.Name,
JobID: "bar",
},
}
propsed := NewProposedAllocConstraintIterator(ctx, static)
propsed.SetTaskGroup(taskGroup)
propsed.SetJob(&structs.Job{ID: "foo"})
out := collectFeasible(propsed)
if len(out) != 1 {
t.Fatalf("Bad: %#v", out)
}
// Expect it to skip the first node as there is a previous alloc on it for
// the same task group.
if out[0] != nodes[1] {
t.Fatalf("Bad: %v", out)
}
}
func collectFeasible(iter FeasibleIterator) (out []*structs.Node) {
for {
next := iter.Next()

View File

@@ -35,16 +35,17 @@ type Stack interface {
// GenericStack is the Stack used for the Generic scheduler. It is
// designed to make better placement decisions at the cost of performance.
type GenericStack struct {
batch bool
ctx Context
source *StaticIterator
jobConstraint *ConstraintIterator
taskGroupDrivers *DriverIterator
taskGroupConstraint *ConstraintIterator
binPack *BinPackIterator
jobAntiAff *JobAntiAffinityIterator
limit *LimitIterator
maxScore *MaxScoreIterator
batch bool
ctx Context
source *StaticIterator
jobConstraint *ConstraintIterator
taskGroupDrivers *DriverIterator
taskGroupConstraint *ConstraintIterator
proposedAllocConstraint *ProposedAllocConstraintIterator
binPack *BinPackIterator
jobAntiAff *JobAntiAffinityIterator
limit *LimitIterator
maxScore *MaxScoreIterator
}
// NewGenericStack constructs a stack used for selecting service placements
@@ -69,8 +70,11 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
// Filter on task group constraints second
s.taskGroupConstraint = NewConstraintIterator(ctx, s.taskGroupDrivers, nil)
// Filter on constraints that are affected by propsed allocations.
s.proposedAllocConstraint = NewProposedAllocConstraintIterator(ctx, s.taskGroupConstraint)
// Upgrade from feasible to rank iterator
rankSource := NewFeasibleRankIterator(ctx, s.taskGroupConstraint)
rankSource := NewFeasibleRankIterator(ctx, s.proposedAllocConstraint)
// Apply the bin packing, this depends on the resources needed
// by a particular task group. Only enable eviction for the service
@@ -119,6 +123,7 @@ func (s *GenericStack) SetNodes(baseNodes []*structs.Node) {
func (s *GenericStack) SetJob(job *structs.Job) {
s.jobConstraint.SetConstraints(job.Constraints)
s.proposedAllocConstraint.SetJob(job)
s.binPack.SetPriority(job.Priority)
s.jobAntiAff.SetJob(job.ID)
}
@@ -135,6 +140,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup) (*RankedNode, *structs.Reso
// Update the parameters of iterators
s.taskGroupDrivers.SetDrivers(tgConstr.drivers)
s.taskGroupConstraint.SetConstraints(tgConstr.constraints)
s.proposedAllocConstraint.SetTaskGroup(tg)
s.binPack.SetTasks(tg.Tasks)
// Find the node with the max score

View File

@@ -561,6 +561,7 @@ func TestInplaceUpdate_Success(t *testing.T) {
updates := []allocTuple{{Alloc: alloc, TaskGroup: tg}}
stack := NewGenericStack(false, ctx)
stack.SetJob(job)
// Do the inplace update.
unplaced := inplaceUpdate(ctx, eval, job, stack, updates)

View File

@@ -237,6 +237,18 @@ The `constraint` object supports the following keys:
the attribute. This sets the operator to "regexp" and the `value`
to the regular expression.
* `distinct_hosts` - `distinct_hosts` accepts a boolean `true`. The default is
`false`.
When `distinct_hosts` is `true` at the Job level, each instance of all Task
Groups specified in the job is placed on a separate host.
When `distinct_hosts` is `true` at the Task Group level with count > 1, each
instance of a Task Group is placed on a separate host. Different task groups in
the same job _may_ be co-scheduled.
Tasks within a task group are always co-scheduled.
Below is a table documenting the variables that can be interpreted:
<table class="table table-bordered table-striped">