diff --git a/jobspec/parse.go b/jobspec/parse.go index daa76c294..71f4faa42 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -655,8 +655,8 @@ func parseAffinities(result *[]*api.Affinity, list *ast.ObjectList) error { // If "set_contains_any" is provided, set the operand // to "set_contains_any" and the value to the "RTarget" - if affinity, ok := m[structs.ConstraintSetContaintsAny]; ok { - m["Operand"] = structs.ConstraintSetContaintsAny + if affinity, ok := m[structs.ConstraintSetContainsAny]; ok { + m["Operand"] = structs.ConstraintSetContainsAny m["RTarget"] = affinity } diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 0f9fb0ee3..a0cbfbe78 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -27,9 +27,13 @@ import ( "container/heap" "math" + hcodec "github.com/hashicorp/go-msgpack/codec" + multierror "github.com/hashicorp/go-multierror" + + psstructs "github.com/hashicorp/nomad/plugins/shared/structs" + "github.com/gorhill/cronexpr" "github.com/hashicorp/consul/api" - multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/helper" @@ -38,8 +42,6 @@ import ( "github.com/hashicorp/nomad/lib/kheap" "github.com/mitchellh/copystructure" "github.com/ugorji/go/codec" - - hcodec "github.com/hashicorp/go-msgpack/codec" ) var ( @@ -1721,6 +1723,26 @@ func (r *Resources) DiskInBytes() int64 { return int64(r.DiskMB * BytesInMegabyte) } +func (r *Resources) Validate() error { + var mErr multierror.Error + if err := r.MeetsMinResources(); err != nil { + mErr.Errors = append(mErr.Errors, err) + } + + // Ensure the task isn't asking for disk resources + if r.DiskMB > 0 { + mErr.Errors = append(mErr.Errors, errors.New("Task can't ask for disk resources, they have to be specified at the task group level.")) + } + + for i, d := range r.Devices { + if err := d.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, fmt.Errorf("device %d failed validation: %v", i+1, err)) + } + } + + return mErr.ErrorOrNil() +} + // Merge merges this resource with another resource. func (r *Resources) Merge(other *Resources) { if other.CPU != 0 { @@ -2033,7 +2055,6 @@ type RequestedDevice struct { // Count is the number of requested devices Count uint64 - // TODO validate // Constraints are a set of constraints to apply when selecting the device // to use. Constraints []*Constraint @@ -2055,20 +2076,82 @@ func (r *RequestedDevice) Copy() *RequestedDevice { return &nr } +func (r *RequestedDevice) ID() *DeviceIdTuple { + if r == nil || r.Name == "" { + return nil + } + + parts := strings.SplitN(r.Name, "/", 3) + switch len(parts) { + case 1: + return &DeviceIdTuple{ + Type: parts[0], + } + case 2: + return &DeviceIdTuple{ + Vendor: parts[0], + Type: parts[1], + } + default: + return &DeviceIdTuple{ + Vendor: parts[0], + Type: parts[1], + Name: parts[2], + } + } +} + +func (r *RequestedDevice) Validate() error { + if r == nil { + return nil + } + + var mErr multierror.Error + if r.Name == "" { + multierror.Append(&mErr, errors.New("device name must be given as one of the following: type, vendor/type, or vendor/type/name")) + } + + for idx, constr := range r.Constraints { + // Ensure that the constraint doesn't use an operand we do not allow + switch constr.Operand { + case ConstraintDistinctHosts, ConstraintDistinctProperty: + outer := fmt.Errorf("Constraint %d validation failed: using unsupported operand %q", idx+1, constr.Operand) + multierror.Append(&mErr, outer) + default: + if err := constr.Validate(); err != nil { + outer := fmt.Errorf("Constraint %d validation failed: %s", idx+1, err) + multierror.Append(&mErr, outer) + } + } + } + for idx, affinity := range r.Affinities { + if err := affinity.Validate(); err != nil { + outer := fmt.Errorf("Affinity %d validation failed: %s", idx+1, err) + multierror.Append(&mErr, outer) + } + } + + return mErr.ErrorOrNil() +} + // NodeResources is used to define the resources available on a client node. type NodeResources struct { Cpu NodeCpuResources Memory NodeMemoryResources Disk NodeDiskResources Networks Networks + Devices []*NodeDeviceResource } func (n *NodeResources) Copy() *NodeResources { if n == nil { return nil } + newN := new(NodeResources) *newN = *n + + // Copy the networks if n.Networks != nil { networks := len(n.Networks) newN.Networks = make([]*NetworkResource, networks) @@ -2076,6 +2159,16 @@ func (n *NodeResources) Copy() *NodeResources { newN.Networks[i] = n.Networks[i].Copy() } } + + // Copy the devices + if n.Devices != nil { + devices := len(n.Devices) + newN.Devices = make([]*NodeDeviceResource, devices) + for i := 0; i < devices; i++ { + newN.Devices[i] = n.Devices[i].Copy() + } + } + return newN } @@ -2115,6 +2208,10 @@ func (n *NodeResources) Merge(o *NodeResources) { if len(o.Networks) != 0 { n.Networks = o.Networks } + + if len(o.Devices) != 0 { + n.Devices = o.Devices + } } func (n *NodeResources) Equals(o *NodeResources) bool { @@ -2145,6 +2242,20 @@ func (n *NodeResources) Equals(o *NodeResources) bool { } } + // Check the devices + if len(n.Devices) != len(o.Devices) { + return false + } + idMap := make(map[DeviceIdTuple]*NodeDeviceResource, len(n.Devices)) + for _, d := range n.Devices { + idMap[*d.ID()] = d + } + for _, otherD := range o.Devices { + if d, ok := idMap[*otherD.ID()]; !ok || !d.Equals(otherD) { + return false + } + } + return true } @@ -2244,6 +2355,208 @@ func (n *NodeDiskResources) Equals(o *NodeDiskResources) bool { return true } +// DeviceIdTuple is the tuple that identifies a device +type DeviceIdTuple struct { + Vendor string + Type string + Name string +} + +// Matches returns if this Device ID is a superset of the passed ID. +func (id *DeviceIdTuple) Matches(other *DeviceIdTuple) bool { + if other == nil { + return false + } + + if other.Name != "" && other.Name != id.Name { + return false + } + + if other.Vendor != "" && other.Vendor != id.Vendor { + return false + } + + if other.Type != "" && other.Type != id.Type { + return false + } + + return true +} + +// NodeDeviceResource captures a set of devices sharing a common +// vendor/type/device_name tuple. +type NodeDeviceResource struct { + Vendor string + Type string + Name string + Instances []*NodeDevice + Attributes map[string]*psstructs.Attribute +} + +func (n *NodeDeviceResource) ID() *DeviceIdTuple { + if n == nil { + return nil + } + + return &DeviceIdTuple{ + Vendor: n.Vendor, + Type: n.Type, + Name: n.Name, + } +} + +func (n *NodeDeviceResource) Copy() *NodeDeviceResource { + if n == nil { + return nil + } + + // Copy the primitives + nn := *n + + // Copy the device instances + if l := len(nn.Instances); l != 0 { + nn.Instances = make([]*NodeDevice, 0, l) + for _, d := range n.Instances { + nn.Instances = append(nn.Instances, d.Copy()) + } + } + + // Copy the Attributes + nn.Attributes = psstructs.CopyMapStringAttribute(nn.Attributes) + + return &nn +} + +func (n *NodeDeviceResource) Equals(o *NodeDeviceResource) bool { + if o == nil && n == nil { + return true + } else if o == nil { + return false + } else if n == nil { + return false + } + + if n.Vendor != o.Vendor { + return false + } else if n.Type != o.Type { + return false + } else if n.Name != o.Name { + return false + } + + // Check the attributes + if len(n.Attributes) != len(o.Attributes) { + return false + } + for k, v := range n.Attributes { + if otherV, ok := o.Attributes[k]; !ok || v != otherV { + return false + } + } + + // Check the instances + if len(n.Instances) != len(o.Instances) { + return false + } + idMap := make(map[string]*NodeDevice, len(n.Instances)) + for _, d := range n.Instances { + idMap[d.ID] = d + } + for _, otherD := range o.Instances { + if d, ok := idMap[otherD.ID]; !ok || !d.Equals(otherD) { + return false + } + } + + return true +} + +// NodeDevice is an instance of a particular device. +type NodeDevice struct { + // ID is the ID of the device. + ID string + + // Healthy captures whether the device is healthy. + Healthy bool + + // HealthDescription is used to provide a human readable description of why + // the device may be unhealthy. + HealthDescription string + + // Locality stores HW locality information for the node to optionally be + // used when making placement decisions. + Locality *NodeDeviceLocality +} + +func (n *NodeDevice) Equals(o *NodeDevice) bool { + if o == nil && n == nil { + return true + } else if o == nil { + return false + } else if n == nil { + return false + } + + if n.ID != o.ID { + return false + } else if n.Healthy != o.Healthy { + return false + } else if n.HealthDescription != o.HealthDescription { + return false + } else if !n.Locality.Equals(o.Locality) { + return false + } + + return false +} + +func (n *NodeDevice) Copy() *NodeDevice { + if n == nil { + return nil + } + + // Copy the primitives + nn := *n + + // Copy the locality + nn.Locality = nn.Locality.Copy() + + return &nn +} + +// NodeDeviceLocality stores information about the devices hardware locality on +// the node. +type NodeDeviceLocality struct { + // PciBusID is the PCI Bus ID for the device. + PciBusID string +} + +func (n *NodeDeviceLocality) Equals(o *NodeDeviceLocality) bool { + if o == nil && n == nil { + return true + } else if o == nil { + return false + } else if n == nil { + return false + } + + if n.PciBusID != o.PciBusID { + return false + } + + return true +} + +func (n *NodeDeviceLocality) Copy() *NodeDeviceLocality { + if n == nil { + return nil + } + + // Copy the primitives + nn := *n + return &nn +} + // NodeReservedResources is used to capture the resources on a client node that // should be reserved and not made available to jobs. type NodeReservedResources struct { @@ -4783,15 +5096,8 @@ func (t *Task) Validate(ephemeralDisk *EphemeralDisk, jobType string) error { // Validate the resources. if t.Resources == nil { mErr.Errors = append(mErr.Errors, errors.New("Missing task resources")) - } else { - if err := t.Resources.MeetsMinResources(); err != nil { - mErr.Errors = append(mErr.Errors, err) - } - - // Ensure the task isn't asking for disk resources - if t.Resources.DiskMB > 0 { - mErr.Errors = append(mErr.Errors, errors.New("Task can't ask for disk resources, they have to be specified at the task group level.")) - } + } else if err := t.Resources.Validate(); err != nil { + mErr.Errors = append(mErr.Errors, err) } // Validate the log config @@ -5804,7 +6110,7 @@ const ( ConstraintVersion = "version" ConstraintSetContains = "set_contains" ConstraintSetContainsAll = "set_contains_all" - ConstraintSetContaintsAny = "set_contains_any" + ConstraintSetContainsAny = "set_contains_any" ) // Constraints are used to restrict placement options. @@ -5853,7 +6159,7 @@ func (c *Constraint) Validate() error { switch c.Operand { case ConstraintDistinctHosts: requireLtarget = false - case ConstraintSetContains: + case ConstraintSetContainsAll, ConstraintSetContainsAny, ConstraintSetContains: if c.RTarget == "" { mErr.Errors = append(mErr.Errors, fmt.Errorf("Set contains constraint requires an RTarget")) } @@ -5933,7 +6239,7 @@ func (a *Affinity) Validate() error { // Perform additional validation based on operand switch a.Operand { - case ConstraintSetContainsAll, ConstraintSetContaintsAny, ConstraintSetContains: + case ConstraintSetContainsAll, ConstraintSetContainsAny, ConstraintSetContains: if a.RTarget == "" { mErr.Errors = append(mErr.Errors, fmt.Errorf("Set contains operators require an RTarget")) } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 8c0c2f0b2..88bebcbe8 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1571,13 +1571,15 @@ func TestConstraint_Validate(t *testing.T) { t.Fatalf("expected valid constraint: %v", err) } - // Perform set_contains validation - c.Operand = ConstraintSetContains + // Perform set_contains* validation c.RTarget = "" - err = c.Validate() - mErr = err.(*multierror.Error) - if !strings.Contains(mErr.Errors[0].Error(), "requires an RTarget") { - t.Fatalf("err: %s", err) + for _, o := range []string{ConstraintSetContains, ConstraintSetContainsAll, ConstraintSetContainsAny} { + c.Operand = o + err = c.Validate() + mErr = err.(*multierror.Error) + if !strings.Contains(mErr.Errors[0].Error(), "requires an RTarget") { + t.Fatalf("err: %s", err) + } } // Perform LTarget validation diff --git a/plugins/shared/structs/attribute.go b/plugins/shared/structs/attribute.go index 76d3f2bfb..6836a9924 100644 --- a/plugins/shared/structs/attribute.go +++ b/plugins/shared/structs/attribute.go @@ -53,6 +53,53 @@ func (u *Unit) Comparable(o *Unit) bool { return u.Base == o.Base } +// ParseAttribute takes a string and parses it into an attribute, pulling out +// units if they are specified as a suffix on a number. +func ParseAttribute(input string) *Attribute { + ll := len(input) + if ll == 0 { + return &Attribute{String: helper.StringToPtr(input)} + } + + // Check if the string is a number ending with potential units + var unit string + numeric := input + if unicode.IsLetter(rune(input[ll-1])) { + // Try suffix matching + for _, u := range lengthSortedUnits { + if strings.HasSuffix(input, u) { + unit = u + break + } + } + + // Check if we know about the unit. + if len(unit) != 0 { + numeric = strings.TrimSpace(strings.TrimSuffix(input, unit)) + } + } + + // Try to parse as an int + i, err := strconv.ParseInt(numeric, 10, 64) + if err == nil { + return &Attribute{Int: helper.Int64ToPtr(i), Unit: unit} + } + + // Try to parse as a float + f, err := strconv.ParseFloat(numeric, 64) + if err == nil { + return &Attribute{Float: helper.Float64ToPtr(f), Unit: unit} + } + + // Try to parse as a bool + b, err := strconv.ParseBool(input) + if err == nil { + return &Attribute{Bool: helper.BoolToPtr(b)} + } + + return &Attribute{String: helper.StringToPtr(input)} +} + // Attribute is used to describe the value of an attribute, optionally // specifying units type Attribute struct { @@ -72,6 +119,104 @@ type Attribute struct { Unit string } +// NewStringAttribute returns a new string attribute. +func NewStringAttribute(s string) *Attribute { + return &Attribute{ + String: helper.StringToPtr(s), + } +} + +// NewBoolAttribute returns a new boolean attribute. +func NewBoolAttribute(b bool) *Attribute { + return &Attribute{ + Bool: helper.BoolToPtr(b), + } +} + +// NewIntergerAttribute returns a new integer attribute. The unit is not checked +// to be valid. +func NewIntAttribute(i int64, unit string) *Attribute { + return &Attribute{ + Int: helper.Int64ToPtr(i), + Unit: unit, + } +} + +// NewFloatAttribute returns a new float attribute. The unit is not checked to +// be valid. +func NewFloatAttribute(f float64, unit string) *Attribute { + return &Attribute{ + Float: helper.Float64ToPtr(f), + Unit: unit, + } +} + +// GetString returns the string value of the attribute or false if the attribute +// doesn't contain a string. +func (a *Attribute) GetString() (value string, ok bool) { + if a.String == nil { + return "", false + } + + return *a.String, true +} + +// GetBool returns the boolean value of the attribute or false if the attribute +// doesn't contain a boolean. +func (a *Attribute) GetBool() (value bool, ok bool) { + if a.Bool == nil { + return false, false + } + + return *a.Bool, true +} + +// GetInt returns the integer value of the attribute or false if the attribute +// doesn't contain a integer. +func (a *Attribute) GetInt() (value int64, ok bool) { + if a.Int == nil { + return 0, false + } + + return *a.Int, true +} + +// GetFloat returns the float value of the attribute or false if the attribute +// doesn't contain a float. +func (a *Attribute) GetFloat() (value float64, ok bool) { + if a.Float == nil { + return 0.0, false + } + + return *a.Float, true +} + +// Copy returns a copied version of the attribute +func (a *Attribute) Copy() *Attribute { + if a == nil { + return nil + } + + ca := &Attribute{ + Unit: a.Unit, + } + + if a.Float != nil { + ca.Float = helper.Float64ToPtr(*a.Float) + } + if a.Int != nil { + ca.Int = helper.Int64ToPtr(*a.Int) + } + if a.Bool != nil { + ca.Bool = helper.BoolToPtr(*a.Bool) + } + if a.String != nil { + ca.String = helper.StringToPtr(*a.String) + } + + return ca +} + // GoString returns a string representation of the attribute func (a *Attribute) GoString() string { if a == nil { @@ -133,6 +278,39 @@ func (a *Attribute) Validate() error { return nil } +// Comparable returns whether the two attributes are comparable +func (a *Attribute) Comparable(b *Attribute) bool { + if a == nil || b == nil { + return false + } + + // First use the units to decide if comparison is possible + aUnit := a.getTypedUnit() + bUnit := b.getTypedUnit() + if aUnit != nil && bUnit != nil { + return aUnit.Comparable(bUnit) + } else if aUnit != nil && bUnit == nil { + return false + } else if aUnit == nil && bUnit != nil { + return false + } + + if a.String != nil { + if b.String != nil { + return true + } + return false + } + if a.Bool != nil { + if b.Bool != nil { + return true + } + return false + } + + return true +} + // Compare compares two attributes. If the returned boolean value is false, it // means the values are not comparable, either because they are of different // types (bool versus int) or the units are incompatible for comparison. @@ -278,102 +456,7 @@ func (a *Attribute) getInt() int64 { return i } -// Comparable returns whether they are comparable -func (a *Attribute) Comparable(b *Attribute) bool { - if a == nil || b == nil { - return false - } - - // First use the units to decide if comparison is possible - aUnit := a.getTypedUnit() - bUnit := b.getTypedUnit() - if aUnit != nil && bUnit != nil { - return aUnit.Comparable(bUnit) - } else if aUnit != nil && bUnit == nil { - return false - } else if aUnit == nil && bUnit != nil { - return false - } - - if a.String != nil { - if b.String != nil { - return true - } - return false - } - if a.Bool != nil { - if b.Bool != nil { - return true - } - return false - } - - return true -} - // getTypedUnit returns the Unit for the attribute or nil if no unit exists. func (a *Attribute) getTypedUnit() *Unit { return UnitIndex[a.Unit] } - -// ParseAttribute takes a string and parses it into an attribute, pulling out -// units if they are specified as a suffix on a number -func ParseAttribute(input string) *Attribute { - ll := len(input) - if ll == 0 { - return &Attribute{String: helper.StringToPtr(input)} - } - - // Try to parse as a bool - b, err := strconv.ParseBool(input) - if err == nil { - return &Attribute{Bool: helper.BoolToPtr(b)} - } - - // Check if the string is a number ending with potential units - if unicode.IsLetter(rune(input[ll-1])) { - // Try suffix matching - var unit string - for _, u := range lengthSortedUnits { - if strings.HasSuffix(input, u) { - unit = u - break - } - } - - // Check if we know about the unit. If we don't we can only treat this - // as a string - if len(unit) == 0 { - return &Attribute{String: helper.StringToPtr(input)} - } - - // Grab the numeric - numeric := strings.TrimSpace(strings.TrimSuffix(input, unit)) - - // Try to parse as an int - i, err := strconv.ParseInt(numeric, 10, 64) - if err == nil { - return &Attribute{Int: helper.Int64ToPtr(i), Unit: unit} - } - - // Try to parse as a float - f, err := strconv.ParseFloat(numeric, 64) - if err == nil { - return &Attribute{Float: helper.Float64ToPtr(f), Unit: unit} - } - } - - // Try to parse as an int - i, err := strconv.ParseInt(input, 10, 64) - if err == nil { - return &Attribute{Int: helper.Int64ToPtr(i)} - } - - // Try to parse as a float - f, err := strconv.ParseFloat(input, 64) - if err == nil { - return &Attribute{Float: helper.Float64ToPtr(f)} - } - - return &Attribute{String: helper.StringToPtr(input)} -} diff --git a/plugins/shared/structs/attribute_test.go b/plugins/shared/structs/attribute_test.go index f49f43e88..b30506764 100644 --- a/plugins/shared/structs/attribute_test.go +++ b/plugins/shared/structs/attribute_test.go @@ -558,6 +558,12 @@ func TestAttribute_ParseAndValidate(t *testing.T) { Bool: helper.BoolToPtr(false), }, }, + { + Input: "1", + Expected: &Attribute{ + Int: helper.Int64ToPtr(1), + }, + }, { Input: "100", Expected: &Attribute{ diff --git a/plugins/shared/structs/util.go b/plugins/shared/structs/util.go index 5f84fc4ad..240335571 100644 --- a/plugins/shared/structs/util.go +++ b/plugins/shared/structs/util.go @@ -88,3 +88,16 @@ func Pow(a, b int64) int64 { } return p } + +// CopyMapStringAttribute copies a map of string to Attribute +func CopyMapStringAttribute(in map[string]*Attribute) map[string]*Attribute { + if in == nil { + return nil + } + + out := make(map[string]*Attribute, len(in)) + for k, v := range in { + out[k] = v.Copy() + } + return out +} diff --git a/scheduler/feasible.go b/scheduler/feasible.go index eb74fa1f9..7b1a09d01 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -7,6 +7,8 @@ import ( "strconv" "strings" + psstructs "github.com/hashicorp/nomad/plugins/shared/structs" + "github.com/hashicorp/go-version" "github.com/hashicorp/nomad/nomad/structs" ) @@ -468,8 +470,10 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool { return checkVersionMatch(ctx, lVal, rVal) case structs.ConstraintRegex: return checkRegexpMatch(ctx, lVal, rVal) - case structs.ConstraintSetContains: + case structs.ConstraintSetContains, structs.ConstraintSetContainsAll: return checkSetContainsAll(ctx, lVal, rVal) + case structs.ConstraintSetContainsAny: + return checkSetContainsAny(lVal, rVal) default: return false } @@ -478,7 +482,7 @@ func checkConstraint(ctx Context, operand string, lVal, rVal interface{}) bool { // checkAffinity checks if a specific affinity is satisfied func checkAffinity(ctx Context, operand string, lVal, rVal interface{}) bool { switch operand { - case structs.ConstraintSetContaintsAny: + case structs.ConstraintSetContainsAny: return checkSetContainsAny(lVal, rVal) case structs.ConstraintSetContainsAll, structs.ConstraintSetContains: return checkSetContainsAll(ctx, lVal, rVal) @@ -556,6 +560,48 @@ func checkVersionMatch(ctx Context, lVal, rVal interface{}) bool { return constraints.Check(vers) } +// checkAttributeVersionMatch is used to compare a version on the +// left hand side with a set of constraints on the right hand side +func checkAttributeVersionMatch(ctx Context, lVal, rVal *psstructs.Attribute) bool { + // Parse the version + var versionStr string + if s, ok := lVal.GetString(); ok { + versionStr = s + } else if i, ok := lVal.GetInt(); ok { + versionStr = fmt.Sprintf("%d", i) + } else { + return false + } + + // Parse the version + vers, err := version.NewVersion(versionStr) + if err != nil { + return false + } + + // Constraint must be a string + constraintStr, ok := rVal.GetString() + if !ok { + return false + } + + // Check the cache for a match + cache := ctx.VersionConstraintCache() + constraints := cache[constraintStr] + + // Parse the constraints + if constraints == nil { + constraints, err = version.NewConstraint(constraintStr) + if err != nil { + return false + } + cache[constraintStr] = constraints + } + + // Check the constraints against the version + return constraints.Check(vers) +} + // checkRegexpMatch is used to compare a value on the // left hand side with a regexp on the right hand side func checkRegexpMatch(ctx Context, lVal, rVal interface{}) bool { @@ -768,3 +814,238 @@ OUTER: return option } } + +// DeviceChecker is a FeasibilityChecker which returns whether a node has the +// devices necessary to scheduler a task group. +type DeviceChecker struct { + ctx Context + + // required is the set of requested devices that must exist on the node + required []*structs.RequestedDevice + + // requiresDevices indicates if the task group requires devices + requiresDevices bool +} + +// NewDeviceChecker creates a DeviceChecker +func NewDeviceChecker(ctx Context) *DeviceChecker { + return &DeviceChecker{ + ctx: ctx, + } +} + +func (c *DeviceChecker) SetTaskGroup(tg *structs.TaskGroup) { + c.required = nil + for _, task := range tg.Tasks { + c.required = append(c.required, task.Resources.Devices...) + } + c.requiresDevices = len(c.required) != 0 +} + +func (c *DeviceChecker) Feasible(option *structs.Node) bool { + if c.hasDevices(option) { + return true + } + + c.ctx.Metrics().FilterNode(option, "missing devices") + return false +} + +func (c *DeviceChecker) hasDevices(option *structs.Node) bool { + if !c.requiresDevices { + return true + } + + // COMPAT(0.11): Remove in 0.11 + // The node does not have the new resources object so it can not have any + // devices + if option.NodeResources == nil { + return false + } + + // Check if the node has any devices + nodeDevs := option.NodeResources.Devices + if len(nodeDevs) == 0 { + return false + } + + // Create a mapping of node devices to the remaining count + available := make(map[*structs.NodeDeviceResource]uint64, len(nodeDevs)) + for _, d := range nodeDevs { + var healthy uint64 = 0 + for _, instance := range d.Instances { + if instance.Healthy { + healthy++ + } + } + if healthy != 0 { + available[d] = healthy + } + } + + // Go through the required devices trying to find matches +OUTER: + for _, req := range c.required { + // Determine how many there are to place + desiredCount := req.Count + + // Go through the device resources and see if we have a match + for d, unused := range available { + if unused == 0 { + // Depleted + continue + } + + if nodeDeviceMatches(c.ctx, d, req) { + // Consume the instances + if unused >= desiredCount { + // This device satisfies all our requests + available[d] -= desiredCount + + // Move on to the next request + continue OUTER + } else { + // This device partially satisfies our requests + available[d] = 0 + desiredCount -= unused + } + } + } + + // We couldn't match the request for the device + if desiredCount > 0 { + return false + } + } + + // Only satisfied if there are no more devices to place + return true +} + +// nodeDeviceMatches checks if the device matches the request and its +// constraints. It doesn't check the count. +func nodeDeviceMatches(ctx Context, d *structs.NodeDeviceResource, req *structs.RequestedDevice) bool { + if !d.ID().Matches(req.ID()) { + return false + } + + // There are no constraints to consider + if len(req.Constraints) == 0 { + return true + } + + for _, c := range req.Constraints { + // Resolve the targets + lVal, ok := resolveDeviceTarget(c.LTarget, d) + if !ok { + return false + } + rVal, ok := resolveDeviceTarget(c.RTarget, d) + if !ok { + return false + } + + // Check if satisfied + if !checkAttributeConstraint(ctx, c.Operand, lVal, rVal) { + return false + } + } + + return true +} + +// resolveDeviceTarget is used to resolve the LTarget and RTarget of a Constraint +// when used on a device +func resolveDeviceTarget(target string, d *structs.NodeDeviceResource) (*psstructs.Attribute, bool) { + // If no prefix, this must be a literal value + if !strings.HasPrefix(target, "${") { + return psstructs.ParseAttribute(target), true + } + + // Handle the interpolations + switch { + case "${driver.model}" == target: + return psstructs.NewStringAttribute(d.Name), true + + case "${driver.vendor}" == target: + return psstructs.NewStringAttribute(d.Vendor), true + + case "${driver.type}" == target: + return psstructs.NewStringAttribute(d.Type), true + + case strings.HasPrefix(target, "${driver.attr."): + attr := strings.TrimPrefix(target, "${driver.attr.") + attr = strings.TrimSuffix(attr, "}") + val, ok := d.Attributes[attr] + return val, ok + + default: + return nil, false + } +} + +// checkAttributeConstraint checks if a constraint is satisfied +func checkAttributeConstraint(ctx Context, operand string, lVal, rVal *psstructs.Attribute) bool { + // Check for constraints not handled by this checker. + switch operand { + case structs.ConstraintDistinctHosts, structs.ConstraintDistinctProperty: + return true + default: + break + } + + switch operand { + case "<", "<=", ">", ">=", "=", "==", "is", "!=", "not": + v, ok := lVal.Compare(rVal) + if !ok { + return false + } + + switch operand { + case "not", "!=": + return v != 0 + case "is", "==", "=": + return v == 0 + case "<": + return v == -1 + case "<=": + return v != 1 + case ">": + return v == 1 + case ">=": + return v != -1 + default: + return false + } + + case structs.ConstraintVersion: + return checkAttributeVersionMatch(ctx, lVal, rVal) + case structs.ConstraintRegex: + ls, ok := lVal.GetString() + rs, ok2 := rVal.GetString() + if !ok || !ok2 { + return false + } + return checkRegexpMatch(ctx, ls, rs) + case structs.ConstraintSetContains, structs.ConstraintSetContainsAll: + ls, ok := lVal.GetString() + rs, ok2 := rVal.GetString() + if !ok || !ok2 { + return false + } + + return checkSetContainsAll(ctx, ls, rs) + case structs.ConstraintSetContainsAny: + ls, ok := lVal.GetString() + rs, ok2 := rVal.GetString() + if !ok || !ok2 { + return false + } + + return checkSetContainsAny(ls, rs) + default: + return false + } + + return false +} diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index 9346d0ba1..dc8e8fcec 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -9,6 +9,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + psstructs "github.com/hashicorp/nomad/plugins/shared/structs" "github.com/stretchr/testify/require" ) @@ -1619,3 +1620,458 @@ func TestSetContainsAny(t *testing.T) { require.True(t, checkSetContainsAny("a", "a")) require.False(t, checkSetContainsAny("b", "a")) } + +func TestDeviceChecker(t *testing.T) { + getTg := func(devices ...*structs.RequestedDevice) *structs.TaskGroup { + return &structs.TaskGroup{ + Name: "example", + Tasks: []*structs.Task{ + { + Resources: &structs.Resources{ + Devices: devices, + }, + }, + }, + } + } + + // Just type + gpuTypeReq := &structs.RequestedDevice{ + Name: "gpu", + Count: 1, + } + fpgaTypeReq := &structs.RequestedDevice{ + Name: "fpga", + Count: 1, + } + + // vendor/type + gpuVendorTypeReq := &structs.RequestedDevice{ + Name: "nvidia/gpu", + Count: 1, + } + fpgaVendorTypeReq := &structs.RequestedDevice{ + Name: "nvidia/fpga", + Count: 1, + } + + // vendor/type/model + gpuFullReq := &structs.RequestedDevice{ + Name: "nvidia/gpu/1080ti", + Count: 1, + } + fpgaFullReq := &structs.RequestedDevice{ + Name: "nvidia/fpga/F100", + Count: 1, + } + + // Just type but high count + gpuTypeHighCountReq := &structs.RequestedDevice{ + Name: "gpu", + Count: 3, + } + + getNode := func(devices ...*structs.NodeDeviceResource) *structs.Node { + n := mock.Node() + n.NodeResources.Devices = devices + return n + } + + nvidia := &structs.NodeDeviceResource{ + Vendor: "nvidia", + Type: "gpu", + Name: "1080ti", + Attributes: map[string]*psstructs.Attribute{ + "memory": psstructs.NewIntAttribute(4, psstructs.UnitGiB), + "pci_bandwidth": psstructs.NewIntAttribute(995, psstructs.UnitMiBPerS), + "cores_clock": psstructs.NewIntAttribute(800, psstructs.UnitMHz), + }, + Instances: []*structs.NodeDevice{ + &structs.NodeDevice{ + ID: uuid.Generate(), + Healthy: true, + }, + &structs.NodeDevice{ + ID: uuid.Generate(), + Healthy: true, + }, + }, + } + + nvidiaUnhealthy := &structs.NodeDeviceResource{ + Vendor: "nvidia", + Type: "gpu", + Name: "1080ti", + Instances: []*structs.NodeDevice{ + &structs.NodeDevice{ + ID: uuid.Generate(), + Healthy: false, + }, + &structs.NodeDevice{ + ID: uuid.Generate(), + Healthy: false, + }, + }, + } + + intel := &structs.NodeDeviceResource{ + Vendor: "intel", + Type: "gpu", + Name: "GT640", + Instances: []*structs.NodeDevice{ + &structs.NodeDevice{ + ID: uuid.Generate(), + Healthy: true, + }, + &structs.NodeDevice{ + ID: uuid.Generate(), + Healthy: false, + }, + }, + } + + cases := []struct { + Name string + Result bool + NodeDevices []*structs.NodeDeviceResource + RequestedDevices []*structs.RequestedDevice + }{ + { + Name: "no devices on node", + Result: false, + NodeDevices: nil, + RequestedDevices: []*structs.RequestedDevice{gpuTypeReq}, + }, + { + Name: "no requested devices on empty node", + Result: true, + NodeDevices: nil, + RequestedDevices: nil, + }, + { + Name: "gpu devices by type", + Result: true, + NodeDevices: []*structs.NodeDeviceResource{nvidia}, + RequestedDevices: []*structs.RequestedDevice{gpuTypeReq}, + }, + { + Name: "wrong devices by type", + Result: false, + NodeDevices: []*structs.NodeDeviceResource{nvidia}, + RequestedDevices: []*structs.RequestedDevice{fpgaTypeReq}, + }, + { + Name: "devices by type unhealthy node", + Result: false, + NodeDevices: []*structs.NodeDeviceResource{nvidiaUnhealthy}, + RequestedDevices: []*structs.RequestedDevice{gpuTypeReq}, + }, + { + Name: "gpu devices by vendor/type", + Result: true, + NodeDevices: []*structs.NodeDeviceResource{nvidia}, + RequestedDevices: []*structs.RequestedDevice{gpuVendorTypeReq}, + }, + { + Name: "wrong devices by vendor/type", + Result: false, + NodeDevices: []*structs.NodeDeviceResource{nvidia}, + RequestedDevices: []*structs.RequestedDevice{fpgaVendorTypeReq}, + }, + { + Name: "gpu devices by vendor/type/model", + Result: true, + NodeDevices: []*structs.NodeDeviceResource{nvidia}, + RequestedDevices: []*structs.RequestedDevice{gpuFullReq}, + }, + { + Name: "wrong devices by vendor/type/model", + Result: false, + NodeDevices: []*structs.NodeDeviceResource{nvidia}, + RequestedDevices: []*structs.RequestedDevice{fpgaFullReq}, + }, + { + Name: "too many requested", + Result: false, + NodeDevices: []*structs.NodeDeviceResource{nvidia}, + RequestedDevices: []*structs.RequestedDevice{gpuTypeHighCountReq}, + }, + { + Name: "request split over groups", + Result: true, + NodeDevices: []*structs.NodeDeviceResource{nvidia, intel}, + RequestedDevices: []*structs.RequestedDevice{gpuTypeHighCountReq}, + }, + { + Name: "meets constraints requirement", + Result: true, + NodeDevices: []*structs.NodeDeviceResource{nvidia}, + RequestedDevices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 1, + Constraints: []*structs.Constraint{ + { + Operand: "=", + LTarget: "${driver.model}", + RTarget: "1080ti", + }, + { + Operand: ">", + LTarget: "${driver.attr.memory}", + RTarget: "1320.5 MB", + }, + { + Operand: "<=", + LTarget: "${driver.attr.pci_bandwidth}", + RTarget: ".98 GiB/s", + }, + { + Operand: "=", + LTarget: "${driver.attr.cores_clock}", + RTarget: "800MHz", + }, + }, + }, + }, + }, + { + Name: "meets constraints requirement multiple count", + Result: true, + NodeDevices: []*structs.NodeDeviceResource{nvidia}, + RequestedDevices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 2, + Constraints: []*structs.Constraint{ + { + Operand: "=", + LTarget: "${driver.model}", + RTarget: "1080ti", + }, + { + Operand: ">", + LTarget: "${driver.attr.memory}", + RTarget: "1320.5 MB", + }, + { + Operand: "<=", + LTarget: "${driver.attr.pci_bandwidth}", + RTarget: ".98 GiB/s", + }, + { + Operand: "=", + LTarget: "${driver.attr.cores_clock}", + RTarget: "800MHz", + }, + }, + }, + }, + }, + { + Name: "meets constraints requirement over count", + Result: false, + NodeDevices: []*structs.NodeDeviceResource{nvidia}, + RequestedDevices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 5, + Constraints: []*structs.Constraint{ + { + Operand: "=", + LTarget: "${driver.model}", + RTarget: "1080ti", + }, + { + Operand: ">", + LTarget: "${driver.attr.memory}", + RTarget: "1320.5 MB", + }, + { + Operand: "<=", + LTarget: "${driver.attr.pci_bandwidth}", + RTarget: ".98 GiB/s", + }, + { + Operand: "=", + LTarget: "${driver.attr.cores_clock}", + RTarget: "800MHz", + }, + }, + }, + }, + }, + { + Name: "does not meet first constraint", + Result: false, + NodeDevices: []*structs.NodeDeviceResource{nvidia}, + RequestedDevices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 1, + Constraints: []*structs.Constraint{ + { + Operand: "=", + LTarget: "${driver.model}", + RTarget: "2080ti", + }, + { + Operand: ">", + LTarget: "${driver.attr.memory}", + RTarget: "1320.5 MB", + }, + { + Operand: "<=", + LTarget: "${driver.attr.pci_bandwidth}", + RTarget: ".98 GiB/s", + }, + { + Operand: "=", + LTarget: "${driver.attr.cores_clock}", + RTarget: "800MHz", + }, + }, + }, + }, + }, + { + Name: "does not meet second constraint", + Result: false, + NodeDevices: []*structs.NodeDeviceResource{nvidia}, + RequestedDevices: []*structs.RequestedDevice{ + { + Name: "nvidia/gpu", + Count: 1, + Constraints: []*structs.Constraint{ + { + Operand: "=", + LTarget: "${driver.model}", + RTarget: "1080ti", + }, + { + Operand: "<", + LTarget: "${driver.attr.memory}", + RTarget: "1320.5 MB", + }, + { + Operand: "<=", + LTarget: "${driver.attr.pci_bandwidth}", + RTarget: ".98 GiB/s", + }, + { + Operand: "=", + LTarget: "${driver.attr.cores_clock}", + RTarget: "800MHz", + }, + }, + }, + }, + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + _, ctx := testContext(t) + checker := NewDeviceChecker(ctx) + checker.SetTaskGroup(getTg(c.RequestedDevices...)) + if act := checker.Feasible(getNode(c.NodeDevices...)); act != c.Result { + t.Fatalf("got %v; want %v", act, c.Result) + } + }) + } +} + +func TestCheckAttributeConstraint(t *testing.T) { + type tcase struct { + op string + lVal, rVal *psstructs.Attribute + result bool + } + cases := []tcase{ + { + op: "=", + lVal: psstructs.NewStringAttribute("foo"), + rVal: psstructs.NewStringAttribute("foo"), + result: true, + }, + { + op: "is", + lVal: psstructs.NewStringAttribute("foo"), + rVal: psstructs.NewStringAttribute("foo"), + result: true, + }, + { + op: "==", + lVal: psstructs.NewStringAttribute("foo"), + rVal: psstructs.NewStringAttribute("foo"), + result: true, + }, + { + op: "!=", + lVal: psstructs.NewStringAttribute("foo"), + rVal: psstructs.NewStringAttribute("foo"), + result: false, + }, + { + op: "!=", + lVal: psstructs.NewStringAttribute("foo"), + rVal: psstructs.NewStringAttribute("bar"), + result: true, + }, + { + op: "not", + lVal: psstructs.NewStringAttribute("foo"), + rVal: psstructs.NewStringAttribute("bar"), + result: true, + }, + { + op: structs.ConstraintVersion, + lVal: psstructs.NewStringAttribute("1.2.3"), + rVal: psstructs.NewStringAttribute("~> 1.0"), + result: true, + }, + { + op: structs.ConstraintRegex, + lVal: psstructs.NewStringAttribute("foobarbaz"), + rVal: psstructs.NewStringAttribute("[\\w]+"), + result: true, + }, + { + op: "<", + lVal: psstructs.NewStringAttribute("foo"), + rVal: psstructs.NewStringAttribute("bar"), + result: false, + }, + { + op: structs.ConstraintSetContains, + lVal: psstructs.NewStringAttribute("foo,bar,baz"), + rVal: psstructs.NewStringAttribute("foo, bar "), + result: true, + }, + { + op: structs.ConstraintSetContainsAll, + lVal: psstructs.NewStringAttribute("foo,bar,baz"), + rVal: psstructs.NewStringAttribute("foo, bar "), + result: true, + }, + { + op: structs.ConstraintSetContains, + lVal: psstructs.NewStringAttribute("foo,bar,baz"), + rVal: psstructs.NewStringAttribute("foo,bam"), + result: false, + }, + { + op: structs.ConstraintSetContainsAny, + lVal: psstructs.NewStringAttribute("foo,bar,baz"), + rVal: psstructs.NewStringAttribute("foo,bam"), + result: true, + }, + } + + for _, tc := range cases { + _, ctx := testContext(t) + if res := checkAttributeConstraint(ctx, tc.op, tc.lVal, tc.rVal); res != tc.result { + t.Fatalf("TC: %#v, Result: %v", tc, res) + } + } +} diff --git a/scheduler/stack.go b/scheduler/stack.go index 5d81aa8fa..6be27c5e2 100644 --- a/scheduler/stack.go +++ b/scheduler/stack.go @@ -48,6 +48,7 @@ type GenericStack struct { jobConstraint *ConstraintChecker taskGroupDrivers *DriverChecker taskGroupConstraint *ConstraintChecker + taskGroupDevices *DeviceChecker distinctHostsConstraint *DistinctHostsIterator distinctPropertyConstraint *DistinctPropertyIterator @@ -87,12 +88,15 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack { // Filter on task group constraints second s.taskGroupConstraint = NewConstraintChecker(ctx, nil) + // Filter on task group devices + s.taskGroupDevices = NewDeviceChecker(ctx) + // Create the feasibility wrapper which wraps all feasibility checks in // which feasibility checking can be skipped if the computed node class has // previously been marked as eligible or ineligible. Generally this will be // checks that only needs to examine the single node to determine feasibility. jobs := []FeasibilityChecker{s.jobConstraint} - tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint} + tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs) // Filter on distinct host constraints. @@ -195,6 +199,7 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra // Update the parameters of iterators s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) + s.taskGroupDevices.SetTaskGroup(tg) s.distinctHostsConstraint.SetTaskGroup(tg) s.distinctPropertyConstraint.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) @@ -225,13 +230,16 @@ func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ra // SystemStack is the Stack used for the System scheduler. It is designed to // attempt to make placements on all nodes. type SystemStack struct { - ctx Context - source *StaticIterator - wrappedChecks *FeasibilityWrapper - quota FeasibleIterator - jobConstraint *ConstraintChecker - taskGroupDrivers *DriverChecker - taskGroupConstraint *ConstraintChecker + ctx Context + source *StaticIterator + + wrappedChecks *FeasibilityWrapper + quota FeasibleIterator + jobConstraint *ConstraintChecker + taskGroupDrivers *DriverChecker + taskGroupConstraint *ConstraintChecker + taskGroupDevices *DeviceChecker + distinctPropertyConstraint *DistinctPropertyIterator binPack *BinPackIterator scoreNorm *ScoreNormalizationIterator @@ -259,12 +267,15 @@ func NewSystemStack(ctx Context) *SystemStack { // Filter on task group constraints second s.taskGroupConstraint = NewConstraintChecker(ctx, nil) + // Filter on task group devices + s.taskGroupDevices = NewDeviceChecker(ctx) + // Create the feasibility wrapper which wraps all feasibility checks in // which feasibility checking can be skipped if the computed node class has // previously been marked as eligible or ineligible. Generally this will be // checks that only needs to examine the single node to determine feasibility. jobs := []FeasibilityChecker{s.jobConstraint} - tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint} + tgs := []FeasibilityChecker{s.taskGroupDrivers, s.taskGroupConstraint, s.taskGroupDevices} s.wrappedChecks = NewFeasibilityWrapper(ctx, s.quota, jobs, tgs) // Filter on distinct property constraints. @@ -311,6 +322,7 @@ func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *Ran // Update the parameters of iterators s.taskGroupDrivers.SetDrivers(tgConstr.drivers) s.taskGroupConstraint.SetConstraints(tgConstr.constraints) + s.taskGroupDevices.SetTaskGroup(tg) s.wrappedChecks.SetTaskGroup(tg.Name) s.distinctPropertyConstraint.SetTaskGroup(tg) s.binPack.SetTaskGroup(tg) diff --git a/scheduler/util_test.go b/scheduler/util_test.go index e72113248..08f5812aa 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -758,7 +758,10 @@ func TestInplaceUpdate_ChangedTaskGroup(t *testing.T) { // Create a new task group that prevents in-place updates. tg := &structs.TaskGroup{} *tg = *job.TaskGroups[0] - task := &structs.Task{Name: "FOO"} + task := &structs.Task{ + Name: "FOO", + Resources: &structs.Resources{}, + } tg.Tasks = nil tg.Tasks = append(tg.Tasks, task)