diff --git a/api/jobs_test.go b/api/jobs_test.go index 0df90807e..0453f033a 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -652,6 +652,7 @@ func TestJobs_Canonicalize(t *testing.T) { }, Resources: &Resources{ CPU: intToPtr(500), + Cores: intToPtr(0), MemoryMB: intToPtr(256), Networks: []*NetworkResource{ { diff --git a/api/nodes.go b/api/nodes.go index 4dc399e3f..3d951030e 100644 --- a/api/nodes.go +++ b/api/nodes.go @@ -508,7 +508,9 @@ type NodeResources struct { } type NodeCpuResources struct { - CpuShares int64 + CpuShares int64 + TotalCpuCores uint16 + ReservableCpuCores []uint16 } type NodeMemoryResources struct { diff --git a/api/resources.go b/api/resources.go index 5b5501a5c..ae420b22f 100644 --- a/api/resources.go +++ b/api/resources.go @@ -8,6 +8,7 @@ import ( // a given task or task group. type Resources struct { CPU *int `hcl:"cpu,optional"` + Cores *int `hcl:"cores,optional"` MemoryMB *int `mapstructure:"memory" hcl:"memory,optional"` DiskMB *int `mapstructure:"disk" hcl:"disk,optional"` Networks []*NetworkResource `hcl:"network,block"` @@ -24,9 +25,21 @@ type Resources struct { // where they are not provided. func (r *Resources) Canonicalize() { defaultResources := DefaultResources() - if r.CPU == nil { - r.CPU = defaultResources.CPU + if r.Cores == nil { + r.Cores = defaultResources.Cores + + // only set cpu to the default value if it and cores is not defined + if r.CPU == nil { + r.CPU = defaultResources.CPU + } } + + // CPU will be set to the default if cores is nil above. + // If cpu is nil here then cores has been set and cpu should be 0 + if r.CPU == nil { + r.CPU = intToPtr(0) + } + if r.MemoryMB == nil { r.MemoryMB = defaultResources.MemoryMB } @@ -42,6 +55,7 @@ func (r *Resources) Canonicalize() { func DefaultResources() *Resources { return &Resources{ CPU: intToPtr(100), + Cores: intToPtr(0), MemoryMB: intToPtr(300), } } @@ -54,6 +68,7 @@ func DefaultResources() *Resources { func MinResources() *Resources { return &Resources{ CPU: intToPtr(1), + Cores: intToPtr(0), MemoryMB: intToPtr(10), } } diff --git a/api/resources_test.go b/api/resources_test.go new file mode 100644 index 000000000..9db95a179 --- /dev/null +++ b/api/resources_test.go @@ -0,0 +1,55 @@ +package api + +import ( + "reflect" + "testing" + + "github.com/kr/pretty" +) + +func TestResources_Canonicalize(t *testing.T) { + testCases := []struct { + name string + input *Resources + expected *Resources + }{ + { + name: "empty", + input: &Resources{}, + expected: DefaultResources(), + }, + { + name: "cores", + input: &Resources{ + Cores: intToPtr(2), + MemoryMB: intToPtr(1024), + }, + expected: &Resources{ + CPU: intToPtr(0), + Cores: intToPtr(2), + MemoryMB: intToPtr(1024), + }, + }, + { + name: "cpu", + input: &Resources{ + CPU: intToPtr(500), + MemoryMB: intToPtr(1024), + }, + expected: &Resources{ + CPU: intToPtr(500), + Cores: intToPtr(0), + MemoryMB: intToPtr(1024), + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + tc.input.Canonicalize() + if !reflect.DeepEqual(tc.input, tc.expected) { + t.Fatalf("Name: %v, Diffs:\n%v", tc.name, pretty.Diff(tc.expected, tc.input)) + } + }) + } +} diff --git a/client/client_test.go b/client/client_test.go index a0d0de87c..08c1d0429 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -1483,7 +1483,8 @@ func TestClient_getAllocatedResources(t *testing.T) { expected := structs.ComparableResources{ Flattened: structs.AllocatedTaskResources{ Cpu: structs.AllocatedCpuResources{ - CpuShares: 768, + CpuShares: 768, + ReservedCores: []uint16{}, }, Memory: structs.AllocatedMemoryResources{ MemoryMB: 768, diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 8e28be0fd..79e44bba9 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -1189,6 +1189,10 @@ func ApiResourcesToStructs(in *api.Resources) *structs.Resources { MemoryMB: *in.MemoryMB, } + if in.Cores != nil { + out.Cores = *in.Cores + } + // COMPAT(0.10): Only being used to issue warnings if in.IOPS != nil { out.IOPS = *in.IOPS diff --git a/lib/cpuset/cpuset.go b/lib/cpuset/cpuset.go new file mode 100644 index 000000000..4f03d3f74 --- /dev/null +++ b/lib/cpuset/cpuset.go @@ -0,0 +1,134 @@ +package cpuset + +import ( + "fmt" + "reflect" + "sort" + "strconv" + "strings" +) + +// CPUSet is a set like object that provides methods helpful when working with cpus with systems +// such as the Linux cpuset cgroup subsystem. A CPUSet is immutable and can be safely accessed concurrently. +type CPUSet struct { + cpus map[uint16]struct{} +} + +// New initializes a new CPUSet with 0 or more containing cpus +func New(cpus ...uint16) CPUSet { + cpuset := CPUSet{ + cpus: make(map[uint16]struct{}), + } + + for _, v := range cpus { + cpuset.cpus[v] = struct{}{} + } + + return cpuset +} + +// Size returns to the number of cpus contained in the CPUSet +func (c CPUSet) Size() int { + return len(c.cpus) +} + +// ToSlice returns a sorted slice of uint16 CPU IDs contained in the CPUSet. +func (c CPUSet) ToSlice() []uint16 { + cpus := []uint16{} + for k := range c.cpus { + cpus = append(cpus, k) + } + sort.Slice(cpus, func(i, j int) bool { return cpus[i] < cpus[j] }) + return cpus +} + +// Union returns a new set that is the union of this CPUSet and the supplied other. +// Ex. [0,1,2,3].Union([2,3,4,5]) = [0,1,2,3,4,5] +func (c CPUSet) Union(other CPUSet) CPUSet { + s := New() + for k := range c.cpus { + s.cpus[k] = struct{}{} + } + for k := range other.cpus { + s.cpus[k] = struct{}{} + } + return s +} + +// Difference returns a new set that is the difference of this CPUSet and the supplied other. +// [0,1,2,3].Difference([2,3,4]) = [0,1] +func (c CPUSet) Difference(other CPUSet) CPUSet { + s := New() + for k := range c.cpus { + s.cpus[k] = struct{}{} + } + for k := range other.cpus { + delete(s.cpus, k) + } + return s + +} + +// IsSubsetOf returns true if all cpus of the this CPUSet are present in the other CPUSet. +func (s CPUSet) IsSubsetOf(other CPUSet) bool { + for cpu := range s.cpus { + if _, ok := other.cpus[cpu]; !ok { + return false + } + } + return true +} + +func (s CPUSet) IsSupersetOf(other CPUSet) bool { + for cpu := range other.cpus { + if _, ok := s.cpus[cpu]; !ok { + return false + } + } + return true +} + +// Equals tests the equality of the elements in the CPUSet +func (s CPUSet) Equals(other CPUSet) bool { + return reflect.DeepEqual(s.cpus, other.cpus) +} + +// Parse parses the Linux cpuset format into a CPUSet +// +// Ref: http://man7.org/linux/man-pages/man7/cpuset.7.html#FORMATS +func Parse(s string) (CPUSet, error) { + cpuset := New() + if s == "" { + return cpuset, nil + } + sets := strings.Split(s, ",") + for _, set := range sets { + bounds := strings.Split(set, "-") + if len(bounds) == 1 { + v, err := strconv.Atoi(bounds[0]) + if err != nil { + return New(), err + } + + cpuset.cpus[uint16(v)] = struct{}{} + continue + } + if len(bounds) > 2 { + return New(), fmt.Errorf("failed to parse element %s, more than 1 '-' found", set) + } + + lower, err := strconv.Atoi(bounds[0]) + if err != nil { + return New(), err + } + upper, err := strconv.Atoi(bounds[1]) + if err != nil { + return New(), err + } + for v := lower; v <= upper; v++ { + cpuset.cpus[uint16(v)] = struct{}{} + } + } + + return cpuset, nil +} diff --git a/lib/cpuset/cpuset_test.go b/lib/cpuset/cpuset_test.go new file mode 100644 index 000000000..bd1bc1f7d --- /dev/null +++ b/lib/cpuset/cpuset_test.go @@ -0,0 +1,166 @@ +package cpuset + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestCPUSet_Size(t *testing.T) { + set := New(0, 1, 2, 3) + require.Equal(t, 4, set.Size()) + require.Equal(t, 0, New().Size()) +} + +func TestCPUSet_ToSlice(t *testing.T) { + cases := []struct { + desc string + in CPUSet + out []uint16 + }{ + { + "empty cpuset", + New(), + []uint16{}, + }, + { + "in order", + New(0, 1, 2, 3, 4, 5, 6, 7), + []uint16{0, 1, 2, 3, 4, 5, 6, 7}, + }, + { + "out of order", + New(3, 1, 2, 0), + []uint16{0, 1, 2, 3}, + }, + } + + for _, c := range cases { + require.Exactly(t, c.out, c.in.ToSlice(), c.desc) + } +} + +func TestCPUSet_Equals(t *testing.T) { + cases := []struct { + a CPUSet + b CPUSet + shouldEqual bool + }{ + {New(), New(), true}, + {New(5), New(5), true}, + {New(1, 2, 3, 4, 5), New(1, 2, 3, 4, 5), true}, + + {New(), New(5), false}, + {New(5), New(), false}, + {New(), New(1, 2, 3, 4, 5), false}, + {New(1, 2, 3, 4, 5), New(), false}, + {New(5), New(1, 2, 3, 4, 5), false}, + {New(1, 2, 3, 4, 5), New(5), false}, + } + + for _, c := range cases { + require.Equal(t, c.shouldEqual, c.a.Equals(c.b)) + } +} + +func TestCPUSet_Union(t *testing.T) { + cases := []struct { + a CPUSet + b CPUSet + expected CPUSet + }{ + {New(), New(), New()}, + + {New(), New(0), New(0)}, + {New(0), New(), New(0)}, + {New(0), New(0), New(0)}, + + {New(), New(0, 1, 2, 3), New(0, 1, 2, 3)}, + {New(0, 1), New(0, 1, 2, 3), New(0, 1, 2, 3)}, + {New(2, 3), New(4, 5), New(2, 3, 4, 5)}, + {New(3, 4), New(0, 1, 2, 3), New(0, 1, 2, 3, 4)}, + } + + for _, c := range cases { + require.Exactly(t, c.expected.ToSlice(), c.a.Union(c.b).ToSlice()) + } +} + +func TestCPUSet_Difference(t *testing.T) { + cases := []struct { + a CPUSet + b CPUSet + expected CPUSet + }{ + {New(), New(), New()}, + + {New(), New(0), New()}, + {New(0), New(), New(0)}, + {New(0), New(0), New()}, + + {New(0, 1), New(0, 1, 2, 3), New()}, + {New(2, 3), New(4, 5), New(2, 3)}, + {New(3, 4), New(0, 1, 2, 3), New(4)}, + } + + for _, c := range cases { + require.Exactly(t, c.expected.ToSlice(), c.a.Difference(c.b).ToSlice()) + } +} + +func TestCPUSet_IsSubsetOf(t *testing.T) { + cases := []struct { + a CPUSet + b CPUSet + isSubset bool + }{ + {New(0), New(0), true}, + {New(), New(0), true}, + {New(0), New(), false}, + {New(1, 2), New(0, 1, 2, 3), true}, + {New(2, 1), New(0, 1, 2, 3), true}, + {New(3, 4), New(0, 1, 2, 3), false}, + } + + for _, c := range cases { + require.Equal(t, c.isSubset, c.a.IsSubsetOf(c.b)) + } +} + +func TestCPUSet_IsSupersetOf(t *testing.T) { + cases := []struct { + a CPUSet + b CPUSet + isSuperset bool + }{ + {New(0), New(0), true}, + {New(0), New(), true}, + {New(), New(0), false}, + {New(0, 1, 2, 3), New(0), true}, + {New(0, 1, 2, 3), New(2, 3), true}, + {New(0, 1, 2, 3), New(2, 3, 4), false}, + } + + for _, c := range cases { + require.Equal(t, c.isSuperset, c.a.IsSupersetOf(c.b)) + } +} + +func TestParse(t *testing.T) { + cases := []struct { + cpuset string + expected CPUSet + }{ + {"", New()}, + {"1", New(1)}, + {"0,1,2,3", New(0, 1, 2, 3)}, + {"0-3", New(0, 1, 2, 3)}, + {"0,2-3,5", New(0, 2, 3, 5)}, + } + + for _, c := range cases { + result, err := Parse(c.cpuset) + require.NoError(t, err) + require.True(t, result.Equals(c.expected)) + } +} diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index dfb2d44e3..98f59ea81 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -4523,6 +4523,12 @@ func TestTaskDiff(t *testing.T) { Old: "100", New: "200", }, + { + Type: DiffTypeNone, + Name: "Cores", + Old: "0", + New: "0", + }, { Type: DiffTypeEdited, Name: "DiskMB", @@ -4876,6 +4882,12 @@ func TestTaskDiff(t *testing.T) { Old: "100", New: "100", }, + { + Type: DiffTypeNone, + Name: "Cores", + Old: "0", + New: "0", + }, { Type: DiffTypeNone, Name: "DiskMB", diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 67f771321..926bb16d2 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -98,6 +98,9 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi // Compute the allocs' utilization from zero used := new(ComparableResources) + reservedCores := map[uint16]struct{}{} + var coreOverlap bool + // For each alloc, add the resources for _, alloc := range allocs { // Do not consider the resource impact of terminal allocations @@ -105,7 +108,21 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi continue } - used.Add(alloc.ComparableResources()) + cr := alloc.ComparableResources() + used.Add(cr) + + // Adding the comparable resource unions reserved core sets, need to check if reserved cores overlap + for _, core := range cr.Flattened.Cpu.ReservedCores { + if _, ok := reservedCores[core]; ok { + coreOverlap = true + } else { + reservedCores[core] = struct{}{} + } + } + } + + if coreOverlap { + return false, "cores", used, nil } // Check that the node resources (after subtracting reserved) are a diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index 1b3eb346c..13a18b2b7 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -269,7 +269,9 @@ func TestAllocsFit(t *testing.T) { n := &Node{ NodeResources: &NodeResources{ Cpu: NodeCpuResources{ - CpuShares: 2000, + CpuShares: 2000, + TotalCpuCores: 2, + ReservableCpuCores: []uint16{0, 1}, }, Memory: NodeMemoryResources{ MemoryMB: 2048, @@ -317,7 +319,8 @@ func TestAllocsFit(t *testing.T) { Tasks: map[string]*AllocatedTaskResources{ "web": { Cpu: AllocatedCpuResources{ - CpuShares: 1000, + CpuShares: 1000, + ReservedCores: []uint16{}, }, Memory: AllocatedMemoryResources{ MemoryMB: 1024, @@ -345,9 +348,9 @@ func TestAllocsFit(t *testing.T) { } // Should fit one allocation - fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) + fit, dim, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) require.NoError(err) - require.True(fit) + require.True(fit, "failed for dimension %q", dim) require.EqualValues(1000, used.Flattened.Cpu.CpuShares) require.EqualValues(1024, used.Flattened.Memory.MemoryMB) @@ -357,6 +360,48 @@ func TestAllocsFit(t *testing.T) { require.False(fit) require.EqualValues(2000, used.Flattened.Cpu.CpuShares) require.EqualValues(2048, used.Flattened.Memory.MemoryMB) + + a2 := &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{ + CpuShares: 500, + ReservedCores: []uint16{0}, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 512, + }, + }, + }, + Shared: AllocatedSharedResources{ + DiskMB: 1000, + Networks: Networks{ + { + Mode: "host", + IP: "10.0.0.1", + }, + }, + }, + }, + } + + // Should fit one allocation + fit, dim, used, err = AllocsFit(n, []*Allocation{a2}, nil, false) + require.NoError(err) + require.True(fit, "failed for dimension %q", dim) + require.EqualValues(500, used.Flattened.Cpu.CpuShares) + require.EqualValues([]uint16{0}, used.Flattened.Cpu.ReservedCores) + require.EqualValues(512, used.Flattened.Memory.MemoryMB) + + // Should not fit second allocation + fit, dim, used, err = AllocsFit(n, []*Allocation{a2, a2}, nil, false) + require.NoError(err) + require.False(fit) + require.EqualValues("cores", dim) + require.EqualValues(1000, used.Flattened.Cpu.CpuShares) + require.EqualValues([]uint16{0}, used.Flattened.Cpu.ReservedCores) + require.EqualValues(1024, used.Flattened.Memory.MemoryMB) } func TestAllocsFit_TerminalAlloc(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index b1e1a1961..a667b7750 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -25,6 +25,8 @@ import ( "strings" "time" + "github.com/hashicorp/nomad/lib/cpuset" + "github.com/hashicorp/cronexpr" "github.com/hashicorp/go-msgpack/codec" "github.com/hashicorp/go-multierror" @@ -2178,6 +2180,7 @@ type NodeStubFields struct { // on a client type Resources struct { CPU int + Cores int MemoryMB int DiskMB int IOPS int // COMPAT(0.10): Only being used to issue warnings @@ -2196,6 +2199,7 @@ const ( func DefaultResources() *Resources { return &Resources{ CPU: 100, + Cores: 0, MemoryMB: 300, } } @@ -2208,6 +2212,7 @@ func DefaultResources() *Resources { func MinResources() *Resources { return &Resources{ CPU: 1, + Cores: 0, MemoryMB: 10, } } @@ -2219,6 +2224,11 @@ func (r *Resources) DiskInBytes() int64 { func (r *Resources) Validate() error { var mErr multierror.Error + + if r.Cores > 0 && r.CPU > 0 { + mErr.Errors = append(mErr.Errors, errors.New("Task can only ask for 'cpu' or 'cores' resource, not both.")) + } + if err := r.MeetsMinResources(); err != nil { mErr.Errors = append(mErr.Errors, err) } @@ -2243,6 +2253,9 @@ func (r *Resources) Merge(other *Resources) { if other.CPU != 0 { r.CPU = other.CPU } + if other.Cores != 0 { + r.Cores = other.Cores + } if other.MemoryMB != 0 { r.MemoryMB = other.MemoryMB } @@ -2266,6 +2279,7 @@ func (r *Resources) Equals(o *Resources) bool { return false } return r.CPU == o.CPU && + r.Cores == o.Cores && r.MemoryMB == o.MemoryMB && r.DiskMB == o.DiskMB && r.IOPS == o.IOPS && @@ -2325,7 +2339,7 @@ func (r *Resources) Canonicalize() { func (r *Resources) MeetsMinResources() error { var mErr multierror.Error minResources := MinResources() - if r.CPU < minResources.CPU { + if r.CPU < minResources.CPU && r.Cores == 0 { mErr.Errors = append(mErr.Errors, fmt.Errorf("minimum CPU value is %d; got %d", minResources.CPU, r.CPU)) } if r.MemoryMB < minResources.MemoryMB { @@ -2363,23 +2377,6 @@ func (r *Resources) NetIndex(n *NetworkResource) int { return r.Networks.NetIndex(n) } -// Superset checks if one set of resources is a superset -// of another. This ignores network resources, and the NetworkIndex -// should be used for that. -// COMPAT(0.10): Remove in 0.10 -func (r *Resources) Superset(other *Resources) (bool, string) { - if r.CPU < other.CPU { - return false, "cpu" - } - if r.MemoryMB < other.MemoryMB { - return false, "memory" - } - if r.DiskMB < other.DiskMB { - return false, "disk" - } - return true, "" -} - // Add adds the resources of the delta to this, potentially // returning an error if not possible. // COMPAT(0.10): Remove in 0.10 @@ -2807,7 +2804,8 @@ func (n *NodeResources) Comparable() *ComparableResources { c := &ComparableResources{ Flattened: AllocatedTaskResources{ Cpu: AllocatedCpuResources{ - CpuShares: n.Cpu.CpuShares, + CpuShares: n.Cpu.CpuShares, + ReservedCores: n.Cpu.ReservableCpuCores, }, Memory: AllocatedMemoryResources{ MemoryMB: n.Memory.MemoryMB, @@ -2957,6 +2955,15 @@ type NodeCpuResources struct { // CpuShares is the CPU shares available. This is calculated by number of // cores multiplied by the core frequency. CpuShares int64 + + // TotalCpuCores is the total number of cores on the machine. This includes cores not in + // the agent's cpuset if on a linux platform + TotalCpuCores uint16 + + // ReservableCpuCores is the set of cpus which are available to be reserved on the Node. + // This value is currently only reported on Linux platforms which support cgroups and is + // discovered by inspecting the cpuset of the agent's cgroup. + ReservableCpuCores []uint16 } func (n *NodeCpuResources) Merge(o *NodeCpuResources) { @@ -2967,6 +2974,14 @@ func (n *NodeCpuResources) Merge(o *NodeCpuResources) { if o.CpuShares != 0 { n.CpuShares = o.CpuShares } + + if o.TotalCpuCores != 0 { + n.TotalCpuCores = o.TotalCpuCores + } + + if len(o.ReservableCpuCores) != 0 { + n.ReservableCpuCores = o.ReservableCpuCores + } } func (n *NodeCpuResources) Equals(o *NodeCpuResources) bool { @@ -2982,9 +2997,25 @@ func (n *NodeCpuResources) Equals(o *NodeCpuResources) bool { return false } + if n.TotalCpuCores != o.TotalCpuCores { + return false + } + + if len(n.ReservableCpuCores) != len(o.ReservableCpuCores) { + return false + } + for i := range n.ReservableCpuCores { + if n.ReservableCpuCores[i] != o.ReservableCpuCores[i] { + return false + } + } return true } +func (n *NodeCpuResources) SharesPerCore() int64 { + return n.CpuShares / int64(n.TotalCpuCores) +} + // NodeMemoryResources captures the memory resources of the node type NodeMemoryResources struct { // MemoryMB is the total available memory on the node @@ -3298,7 +3329,8 @@ func (n *NodeReservedResources) Comparable() *ComparableResources { c := &ComparableResources{ Flattened: AllocatedTaskResources{ Cpu: AllocatedCpuResources{ - CpuShares: n.Cpu.CpuShares, + CpuShares: n.Cpu.CpuShares, + ReservedCores: n.Cpu.ReservedCpuCores, }, Memory: AllocatedMemoryResources{ MemoryMB: n.Memory.MemoryMB, @@ -3313,7 +3345,8 @@ func (n *NodeReservedResources) Comparable() *ComparableResources { // NodeReservedCpuResources captures the reserved CPU resources of the node. type NodeReservedCpuResources struct { - CpuShares int64 + CpuShares int64 + ReservedCpuCores []uint16 } // NodeReservedMemoryResources captures the reserved memory resources of the node. @@ -3552,7 +3585,8 @@ func (a *AllocatedTaskResources) Comparable() *ComparableResources { ret := &ComparableResources{ Flattened: AllocatedTaskResources{ Cpu: AllocatedCpuResources{ - CpuShares: a.Cpu.CpuShares, + CpuShares: a.Cpu.CpuShares, + ReservedCores: a.Cpu.ReservedCores, }, Memory: AllocatedMemoryResources{ MemoryMB: a.Memory.MemoryMB, @@ -3636,7 +3670,8 @@ func (a *AllocatedSharedResources) Canonicalize() { // AllocatedCpuResources captures the allocated CPU resources. type AllocatedCpuResources struct { - CpuShares int64 + CpuShares int64 + ReservedCores []uint16 } func (a *AllocatedCpuResources) Add(delta *AllocatedCpuResources) { @@ -3645,6 +3680,8 @@ func (a *AllocatedCpuResources) Add(delta *AllocatedCpuResources) { } a.CpuShares += delta.CpuShares + + a.ReservedCores = cpuset.New(a.ReservedCores...).Union(cpuset.New(delta.ReservedCores...)).ToSlice() } func (a *AllocatedCpuResources) Subtract(delta *AllocatedCpuResources) { @@ -3653,6 +3690,7 @@ func (a *AllocatedCpuResources) Subtract(delta *AllocatedCpuResources) { } a.CpuShares -= delta.CpuShares + a.ReservedCores = cpuset.New(a.ReservedCores...).Difference(cpuset.New(delta.ReservedCores...)).ToSlice() } func (a *AllocatedCpuResources) Max(other *AllocatedCpuResources) { @@ -3663,6 +3701,10 @@ func (a *AllocatedCpuResources) Max(other *AllocatedCpuResources) { if other.CpuShares > a.CpuShares { a.CpuShares = other.CpuShares } + + if len(other.ReservedCores) > len(a.ReservedCores) { + a.ReservedCores = other.ReservedCores + } } // AllocatedMemoryResources captures the allocated memory resources. @@ -3802,6 +3844,10 @@ func (c *ComparableResources) Superset(other *ComparableResources) (bool, string if c.Flattened.Cpu.CpuShares < other.Flattened.Cpu.CpuShares { return false, "cpu" } + + if len(c.Flattened.Cpu.ReservedCores) > 0 && !cpuset.New(c.Flattened.Cpu.ReservedCores...).IsSupersetOf(cpuset.New(other.Flattened.Cpu.ReservedCores...)) { + return false, "cores" + } if c.Flattened.Memory.MemoryMB < other.Flattened.Memory.MemoryMB { return false, "memory" } diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 09a411f24..4132796a0 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -2621,32 +2621,6 @@ func TestResource_NetIndex(t *testing.T) { } } -func TestResource_Superset(t *testing.T) { - r1 := &Resources{ - CPU: 2000, - MemoryMB: 2048, - DiskMB: 10000, - } - r2 := &Resources{ - CPU: 2000, - MemoryMB: 1024, - DiskMB: 5000, - } - - if s, _ := r1.Superset(r1); !s { - t.Fatalf("bad") - } - if s, _ := r1.Superset(r2); !s { - t.Fatalf("bad") - } - if s, _ := r2.Superset(r1); s { - t.Fatalf("bad") - } - if s, _ := r2.Superset(r2); !s { - t.Fatalf("bad") - } -} - func TestResource_Add(t *testing.T) { r1 := &Resources{ CPU: 2000, @@ -2733,7 +2707,8 @@ func TestComparableResources_Subtract(t *testing.T) { r1 := &ComparableResources{ Flattened: AllocatedTaskResources{ Cpu: AllocatedCpuResources{ - CpuShares: 2000, + CpuShares: 2000, + ReservedCores: []uint16{0, 1}, }, Memory: AllocatedMemoryResources{ MemoryMB: 2048, @@ -2754,7 +2729,8 @@ func TestComparableResources_Subtract(t *testing.T) { r2 := &ComparableResources{ Flattened: AllocatedTaskResources{ Cpu: AllocatedCpuResources{ - CpuShares: 1000, + CpuShares: 1000, + ReservedCores: []uint16{0}, }, Memory: AllocatedMemoryResources{ MemoryMB: 1024, @@ -2776,7 +2752,8 @@ func TestComparableResources_Subtract(t *testing.T) { expect := &ComparableResources{ Flattened: AllocatedTaskResources{ Cpu: AllocatedCpuResources{ - CpuShares: 1000, + CpuShares: 1000, + ReservedCores: []uint16{1}, }, Memory: AllocatedMemoryResources{ MemoryMB: 1024, @@ -5532,7 +5509,9 @@ func TestNode_Copy(t *testing.T) { }, NodeResources: &NodeResources{ Cpu: NodeCpuResources{ - CpuShares: 4000, + CpuShares: 4000, + TotalCpuCores: 4, + ReservableCpuCores: []uint16{0, 1, 2, 3}, }, Memory: NodeMemoryResources{ MemoryMB: 8192, @@ -5550,7 +5529,8 @@ func TestNode_Copy(t *testing.T) { }, ReservedResources: &NodeReservedResources{ Cpu: NodeReservedCpuResources{ - CpuShares: 100, + CpuShares: 100, + ReservedCpuCores: []uint16{0}, }, Memory: NodeReservedMemoryResources{ MemoryMB: 256, @@ -5803,7 +5783,8 @@ func TestMultiregion_CopyCanonicalize(t *testing.T) { func TestNodeResources_Merge(t *testing.T) { res := &NodeResources{ Cpu: NodeCpuResources{ - CpuShares: int64(32000), + CpuShares: int64(32000), + TotalCpuCores: 32, }, Memory: NodeMemoryResources{ MemoryMB: int64(64000), @@ -5816,6 +5797,7 @@ func TestNodeResources_Merge(t *testing.T) { } res.Merge(&NodeResources{ + Cpu: NodeCpuResources{ReservableCpuCores: []uint16{0, 1, 2, 3}}, Memory: NodeMemoryResources{ MemoryMB: int64(100000), }, @@ -5828,7 +5810,9 @@ func TestNodeResources_Merge(t *testing.T) { require.Exactly(t, &NodeResources{ Cpu: NodeCpuResources{ - CpuShares: int64(32000), + CpuShares: int64(32000), + TotalCpuCores: 32, + ReservableCpuCores: []uint16{0, 1, 2, 3}, }, Memory: NodeMemoryResources{ MemoryMB: int64(100000), @@ -6050,6 +6034,69 @@ func TestTaskGroup_validateScriptChecksInGroupServices(t *testing.T) { }) } +func TestComparableResources_Superset(t *testing.T) { + base := &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{ + CpuShares: 4000, + ReservedCores: []uint16{0, 1, 2, 3}, + }, + Memory: AllocatedMemoryResources{MemoryMB: 4096}, + }, + Shared: AllocatedSharedResources{DiskMB: 10000}, + } + cases := []struct { + a *ComparableResources + b *ComparableResources + dimension string + }{ + { + a: base, + b: &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 1000, ReservedCores: []uint16{0}}, + }, + }, + }, + { + a: base, + b: &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 4000, ReservedCores: []uint16{0, 1, 2, 3}}, + }, + }, + }, + { + a: base, + b: &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 5000}, + }, + }, + dimension: "cpu", + }, + { + a: base, + b: &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 1000, ReservedCores: []uint16{3, 4}}, + }, + }, + dimension: "cores", + }, + } + + for _, c := range cases { + fit, dim := c.a.Superset(c.b) + if c.dimension == "" { + require.True(t, fit) + } else { + require.False(t, fit) + require.Equal(t, c.dimension, dim) + } + } +} + func requireErrors(t *testing.T, err error, expected ...string) { t.Helper() require.Error(t, err) diff --git a/scheduler/rank.go b/scheduler/rank.go index efe586efa..6ac93bfda 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -4,6 +4,8 @@ import ( "fmt" "math" + "github.com/hashicorp/nomad/lib/cpuset" + "github.com/hashicorp/nomad/nomad/structs" ) @@ -403,6 +405,38 @@ OUTER: } } + // Check if we need to allocate any reserved cores + if task.Resources.Cores > 0 { + // set of reservable CPUs for the node + nodeCPUSet := cpuset.New(option.Node.NodeResources.Cpu.ReservableCpuCores...) + // set of all reserved CPUs on the node + allocatedCPUSet := cpuset.New() + for _, alloc := range proposed { + allocatedCPUSet = allocatedCPUSet.Union(cpuset.New(alloc.ComparableResources().Flattened.Cpu.ReservedCores...)) + } + + // add any cores that were reserved for other tasks + for _, tr := range total.Tasks { + allocatedCPUSet = allocatedCPUSet.Union(cpuset.New(tr.Cpu.ReservedCores...)) + } + + // set of CPUs not yet reserved on the node + availableCPUSet := nodeCPUSet.Difference(allocatedCPUSet) + + // If not enough cores are available mark the node as exhausted + if availableCPUSet.Size() < task.Resources.Cores { + // TODO preemption + iter.ctx.Metrics().ExhaustedNode(option.Node, "cores") + continue OUTER + } + + // Set the task's reserved cores + taskResources.Cpu.ReservedCores = availableCPUSet.ToSlice()[0:task.Resources.Cores] + // Total CPU usage on the node is still tracked by CPUShares. Even though the task will have the entire + // core reserved, we still track overall usage by cpu shares. + taskResources.Cpu.CpuShares = option.Node.NodeResources.Cpu.SharesPerCore() * int64(task.Resources.Cores) + } + // Store the task resource option.SetTaskResources(task, taskResources) diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 2cdad5485..5d91dee0f 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -589,6 +589,122 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) { } } +func TestBinPackIterator_ReservedCores(t *testing.T) { + state, ctx := testContext(t) + nodes := []*RankedNode{ + { + Node: &structs.Node{ + // Perfect fit + ID: uuid.Generate(), + NodeResources: &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: 2048, + TotalCpuCores: 2, + ReservableCpuCores: []uint16{0, 1}, + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: 2048, + }, + }, + }, + }, + { + Node: &structs.Node{ + // Perfect fit + ID: uuid.Generate(), + NodeResources: &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: 2048, + TotalCpuCores: 2, + ReservableCpuCores: []uint16{0, 1}, + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: 2048, + }, + }, + }, + }, + } + static := NewStaticRankIterator(ctx, nodes) + + // Add existing allocations + j1, j2 := mock.Job(), mock.Job() + alloc1 := &structs.Allocation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + JobID: j1.ID, + Job: j1, + AllocatedResources: &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 2048, + ReservedCores: []uint16{0, 1}, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 2048, + }, + }, + }, + }, + DesiredStatus: structs.AllocDesiredStatusRun, + ClientStatus: structs.AllocClientStatusPending, + TaskGroup: "web", + } + alloc2 := &structs.Allocation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[1].Node.ID, + JobID: j2.ID, + Job: j2, + AllocatedResources: &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 1024, + ReservedCores: []uint16{0}, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 1024, + }, + }, + }, + }, + DesiredStatus: structs.AllocDesiredStatusRun, + ClientStatus: structs.AllocClientStatusPending, + TaskGroup: "web", + } + require.NoError(t, state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))) + require.NoError(t, state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) + require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2})) + + taskGroup := &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + Cores: 1, + MemoryMB: 1024, + }, + }, + }, + } + binp := NewBinPackIterator(ctx, static, false, 0, structs.SchedulerAlgorithmBinpack) + binp.SetTaskGroup(taskGroup) + + scoreNorm := NewScoreNormalizationIterator(ctx, binp) + + out := collectRanked(scoreNorm) + require := require.New(t) + require.Len(out, 1) + require.Equal(nodes[1].Node.ID, out[0].Node.ID) + require.Equal([]uint16{1}, out[0].TaskResources["web"].Cpu.ReservedCores) +} + func TestBinPackIterator_ExistingAlloc(t *testing.T) { state, ctx := testContext(t) nodes := []*RankedNode{ diff --git a/scheduler/util.go b/scheduler/util.go index 86461a8f6..2b3ded929 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -425,6 +425,8 @@ func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) bool { // Inspect the non-network resources if ar, br := at.Resources, bt.Resources; ar.CPU != br.CPU { return true + } else if ar.Cores != br.Cores { + return true } else if ar.MemoryMB != br.MemoryMB { return true } else if !ar.Devices.Equals(&br.Devices) { diff --git a/scheduler/util_test.go b/scheduler/util_test.go index fba5e611a..4fa17ce9d 100644 --- a/scheduler/util_test.go +++ b/scheduler/util_test.go @@ -674,13 +674,18 @@ func TestTasksUpdated(t *testing.T) { // Change network mode j19 := mock.Job() + j19.TaskGroups[0].Networks[0].Mode = "bridge" + require.True(t, tasksUpdated(j1, j19, name)) + // Change cores resource j20 := mock.Job() + j20.TaskGroups[0].Tasks[0].Resources.CPU = 0 + j20.TaskGroups[0].Tasks[0].Resources.Cores = 2 + j21 := mock.Job() + j21.TaskGroups[0].Tasks[0].Resources.CPU = 0 + j21.TaskGroups[0].Tasks[0].Resources.Cores = 4 + require.True(t, tasksUpdated(j20, j21, name)) - require.False(t, tasksUpdated(j19, j20, name)) - - j20.TaskGroups[0].Networks[0].Mode = "bridge" - require.True(t, tasksUpdated(j19, j20, name)) } func TestTasksUpdated_connectServiceUpdated(t *testing.T) { diff --git a/vendor/github.com/hashicorp/nomad/api/nodes.go b/vendor/github.com/hashicorp/nomad/api/nodes.go index 4dc399e3f..3d951030e 100644 --- a/vendor/github.com/hashicorp/nomad/api/nodes.go +++ b/vendor/github.com/hashicorp/nomad/api/nodes.go @@ -508,7 +508,9 @@ type NodeResources struct { } type NodeCpuResources struct { - CpuShares int64 + CpuShares int64 + TotalCpuCores uint16 + ReservableCpuCores []uint16 } type NodeMemoryResources struct { diff --git a/vendor/github.com/hashicorp/nomad/api/resources.go b/vendor/github.com/hashicorp/nomad/api/resources.go index 5b5501a5c..ae420b22f 100644 --- a/vendor/github.com/hashicorp/nomad/api/resources.go +++ b/vendor/github.com/hashicorp/nomad/api/resources.go @@ -8,6 +8,7 @@ import ( // a given task or task group. type Resources struct { CPU *int `hcl:"cpu,optional"` + Cores *int `hcl:"cores,optional"` MemoryMB *int `mapstructure:"memory" hcl:"memory,optional"` DiskMB *int `mapstructure:"disk" hcl:"disk,optional"` Networks []*NetworkResource `hcl:"network,block"` @@ -24,9 +25,21 @@ type Resources struct { // where they are not provided. func (r *Resources) Canonicalize() { defaultResources := DefaultResources() - if r.CPU == nil { - r.CPU = defaultResources.CPU + if r.Cores == nil { + r.Cores = defaultResources.Cores + + // only set cpu to the default value if it and cores is not defined + if r.CPU == nil { + r.CPU = defaultResources.CPU + } } + + // CPU will be set to the default if cores is nil above. + // If cpu is nil here then cores has been set and cpu should be 0 + if r.CPU == nil { + r.CPU = intToPtr(0) + } + if r.MemoryMB == nil { r.MemoryMB = defaultResources.MemoryMB } @@ -42,6 +55,7 @@ func (r *Resources) Canonicalize() { func DefaultResources() *Resources { return &Resources{ CPU: intToPtr(100), + Cores: intToPtr(0), MemoryMB: intToPtr(300), } } @@ -54,6 +68,7 @@ func DefaultResources() *Resources { func MinResources() *Resources { return &Resources{ CPU: intToPtr(1), + Cores: intToPtr(0), MemoryMB: intToPtr(10), } }