scheduler: implement scheduling of reserved cores

This commit is contained in:
Nick Ethier
2021-03-19 00:29:07 -04:00
parent 5109256c78
commit 94c7aec159
8 changed files with 331 additions and 8 deletions

View File

@@ -79,6 +79,15 @@ func (s CPUSet) IsSubsetOf(other CPUSet) bool {
return true
}
func (s CPUSet) IsSupersetOf(other CPUSet) bool {
for cpu := range other.cpus {
if _, ok := s.cpus[cpu]; !ok {
return false
}
}
return true
}
// Equals tests the equality of the elements in the CPUSet
func (s CPUSet) Equals(other CPUSet) bool {
return reflect.DeepEqual(s.cpus, other.cpus)

View File

@@ -63,7 +63,7 @@ func TestCPUSet_Equals(t *testing.T) {
}
}
func TestCpuSet_Union(t *testing.T) {
func TestCPUSet_Union(t *testing.T) {
cases := []struct {
a CPUSet
b CPUSet
@@ -86,7 +86,7 @@ func TestCpuSet_Union(t *testing.T) {
}
}
func TestCpuSet_Difference(t *testing.T) {
func TestCPUSet_Difference(t *testing.T) {
cases := []struct {
a CPUSet
b CPUSet
@@ -108,6 +108,44 @@ func TestCpuSet_Difference(t *testing.T) {
}
}
func TestCPUSet_IsSubsetOf(t *testing.T) {
cases := []struct {
a CPUSet
b CPUSet
isSubset bool
}{
{New(0), New(0), true},
{New(), New(0), true},
{New(0), New(), false},
{New(1, 2), New(0, 1, 2, 3), true},
{New(2, 1), New(0, 1, 2, 3), true},
{New(3, 4), New(0, 1, 2, 3), false},
}
for _, c := range cases {
require.Equal(t, c.isSubset, c.a.IsSubsetOf(c.b))
}
}
func TestCPUSet_IsSupersetOf(t *testing.T) {
cases := []struct {
a CPUSet
b CPUSet
isSuperset bool
}{
{New(0), New(0), true},
{New(0), New(), true},
{New(), New(0), false},
{New(0, 1, 2, 3), New(0), true},
{New(0, 1, 2, 3), New(2, 3), true},
{New(0, 1, 2, 3), New(2, 3, 4), false},
}
for _, c := range cases {
require.Equal(t, c.isSuperset, c.a.IsSupersetOf(c.b))
}
}
func TestParse(t *testing.T) {
cases := []struct {
cpuset string

View File

@@ -98,6 +98,9 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi
// Compute the allocs' utilization from zero
used := new(ComparableResources)
reservedCores := map[uint16]struct{}{}
var coreOverlap bool
// For each alloc, add the resources
for _, alloc := range allocs {
// Do not consider the resource impact of terminal allocations
@@ -105,7 +108,21 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi
continue
}
used.Add(alloc.ComparableResources())
cr := alloc.ComparableResources()
used.Add(cr)
// Adding the comparable resource unions reserved core sets, need to check if reserved cores overlap
for _, core := range cr.Flattened.Cpu.ReservedCores {
if _, ok := reservedCores[core]; ok {
coreOverlap = true
} else {
reservedCores[core] = struct{}{}
}
}
}
if coreOverlap {
return false, "cores", used, nil
}
// Check that the node resources (after subtracting reserved) are a

View File

@@ -269,7 +269,9 @@ func TestAllocsFit(t *testing.T) {
n := &Node{
NodeResources: &NodeResources{
Cpu: NodeCpuResources{
CpuShares: 2000,
CpuShares: 2000,
TotalCpuCores: 2,
ReservableCpuCores: []uint16{0, 1},
},
Memory: NodeMemoryResources{
MemoryMB: 2048,
@@ -317,7 +319,8 @@ func TestAllocsFit(t *testing.T) {
Tasks: map[string]*AllocatedTaskResources{
"web": {
Cpu: AllocatedCpuResources{
CpuShares: 1000,
CpuShares: 1000,
ReservedCores: []uint16{},
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
@@ -345,9 +348,9 @@ func TestAllocsFit(t *testing.T) {
}
// Should fit one allocation
fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false)
fit, dim, used, err := AllocsFit(n, []*Allocation{a1}, nil, false)
require.NoError(err)
require.True(fit)
require.True(fit, "failed for dimension %q", dim)
require.EqualValues(1000, used.Flattened.Cpu.CpuShares)
require.EqualValues(1024, used.Flattened.Memory.MemoryMB)
@@ -357,6 +360,48 @@ func TestAllocsFit(t *testing.T) {
require.False(fit)
require.EqualValues(2000, used.Flattened.Cpu.CpuShares)
require.EqualValues(2048, used.Flattened.Memory.MemoryMB)
a2 := &Allocation{
AllocatedResources: &AllocatedResources{
Tasks: map[string]*AllocatedTaskResources{
"web": {
Cpu: AllocatedCpuResources{
CpuShares: 500,
ReservedCores: []uint16{0},
},
Memory: AllocatedMemoryResources{
MemoryMB: 512,
},
},
},
Shared: AllocatedSharedResources{
DiskMB: 1000,
Networks: Networks{
{
Mode: "host",
IP: "10.0.0.1",
},
},
},
},
}
// Should fit one allocation
fit, dim, used, err = AllocsFit(n, []*Allocation{a2}, nil, false)
require.NoError(err)
require.True(fit, "failed for dimension %q", dim)
require.EqualValues(500, used.Flattened.Cpu.CpuShares)
require.EqualValues([]uint16{0}, used.Flattened.Cpu.ReservedCores)
require.EqualValues(512, used.Flattened.Memory.MemoryMB)
// Should not fit second allocation
fit, dim, used, err = AllocsFit(n, []*Allocation{a2, a2}, nil, false)
require.NoError(err)
require.False(fit)
require.EqualValues("cores", dim)
require.EqualValues(1000, used.Flattened.Cpu.CpuShares)
require.EqualValues([]uint16{0}, used.Flattened.Cpu.ReservedCores)
require.EqualValues(1024, used.Flattened.Memory.MemoryMB)
}
func TestAllocsFit_TerminalAlloc(t *testing.T) {

View File

@@ -3844,7 +3844,8 @@ func (c *ComparableResources) Superset(other *ComparableResources) (bool, string
if c.Flattened.Cpu.CpuShares < other.Flattened.Cpu.CpuShares {
return false, "cpu"
}
if len(c.Flattened.Cpu.ReservedCores) > 0 && cpuset.New(other.Flattened.Cpu.ReservedCores...).IsSubsetOf(cpuset.New(c.Flattened.Cpu.ReservedCores...)) {
if len(c.Flattened.Cpu.ReservedCores) > 0 && !cpuset.New(c.Flattened.Cpu.ReservedCores...).IsSupersetOf(cpuset.New(other.Flattened.Cpu.ReservedCores...)) {
return false, "cores"
}
if c.Flattened.Memory.MemoryMB < other.Flattened.Memory.MemoryMB {

View File

@@ -6012,6 +6012,69 @@ func TestTaskGroup_validateScriptChecksInGroupServices(t *testing.T) {
})
}
func TestComparableResources_Superset(t *testing.T) {
base := &ComparableResources{
Flattened: AllocatedTaskResources{
Cpu: AllocatedCpuResources{
CpuShares: 4000,
ReservedCores: []uint16{0, 1, 2, 3},
},
Memory: AllocatedMemoryResources{MemoryMB: 4096},
},
Shared: AllocatedSharedResources{DiskMB: 10000},
}
cases := []struct {
a *ComparableResources
b *ComparableResources
dimension string
}{
{
a: base,
b: &ComparableResources{
Flattened: AllocatedTaskResources{
Cpu: AllocatedCpuResources{CpuShares: 1000, ReservedCores: []uint16{0}},
},
},
},
{
a: base,
b: &ComparableResources{
Flattened: AllocatedTaskResources{
Cpu: AllocatedCpuResources{CpuShares: 4000, ReservedCores: []uint16{0, 1, 2, 3}},
},
},
},
{
a: base,
b: &ComparableResources{
Flattened: AllocatedTaskResources{
Cpu: AllocatedCpuResources{CpuShares: 5000},
},
},
dimension: "cpu",
},
{
a: base,
b: &ComparableResources{
Flattened: AllocatedTaskResources{
Cpu: AllocatedCpuResources{CpuShares: 1000, ReservedCores: []uint16{3, 4}},
},
},
dimension: "cores",
},
}
for _, c := range cases {
fit, dim := c.a.Superset(c.b)
if c.dimension == "" {
require.True(t, fit)
} else {
require.False(t, fit)
require.Equal(t, c.dimension, dim)
}
}
}
func requireErrors(t *testing.T, err error, expected ...string) {
t.Helper()
require.Error(t, err)

View File

@@ -4,6 +4,8 @@ import (
"fmt"
"math"
"github.com/hashicorp/nomad/lib/cpuset"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -403,6 +405,38 @@ OUTER:
}
}
// Check if we need to allocate any reserved cores
if task.Resources.Cores > 0 {
// set of reservable CPUs for the node
nodeCPUSet := cpuset.New(option.Node.NodeResources.Cpu.ReservableCpuCores...)
// set of all reserved CPUs on the node
allocatedCPUSet := cpuset.New()
for _, alloc := range proposed {
allocatedCPUSet = allocatedCPUSet.Union(cpuset.New(alloc.ComparableResources().Flattened.Cpu.ReservedCores...))
}
// add any cores that were reserved for other tasks
for _, tr := range total.Tasks {
allocatedCPUSet = allocatedCPUSet.Union(cpuset.New(tr.Cpu.ReservedCores...))
}
// set of CPUs not yet reserved on the node
availableCPUSet := nodeCPUSet.Difference(allocatedCPUSet)
// If not enough cores are available mark the node as exhausted
if availableCPUSet.Size() < task.Resources.Cores {
// TODO preemption
iter.ctx.Metrics().ExhaustedNode(option.Node, "cores")
continue OUTER
}
// Set the task's reserved cores
taskResources.Cpu.ReservedCores = availableCPUSet.ToSlice()[0:task.Resources.Cores]
// Total CPU usage on the node is still tracked by CPUShares. Even though the task will have the entire
// core reserved, we still track overall usage by cpu shares.
taskResources.Cpu.CpuShares = option.Node.NodeResources.Cpu.SharesPerCore() * int64(task.Resources.Cores)
}
// Store the task resource
option.SetTaskResources(task, taskResources)

View File

@@ -589,6 +589,122 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) {
}
}
func TestBinPackIterator_ReservedCores(t *testing.T) {
state, ctx := testContext(t)
nodes := []*RankedNode{
{
Node: &structs.Node{
// Perfect fit
ID: uuid.Generate(),
NodeResources: &structs.NodeResources{
Cpu: structs.NodeCpuResources{
CpuShares: 2048,
TotalCpuCores: 2,
ReservableCpuCores: []uint16{0, 1},
},
Memory: structs.NodeMemoryResources{
MemoryMB: 2048,
},
},
},
},
{
Node: &structs.Node{
// Perfect fit
ID: uuid.Generate(),
NodeResources: &structs.NodeResources{
Cpu: structs.NodeCpuResources{
CpuShares: 2048,
TotalCpuCores: 2,
ReservableCpuCores: []uint16{0, 1},
},
Memory: structs.NodeMemoryResources{
MemoryMB: 2048,
},
},
},
},
}
static := NewStaticRankIterator(ctx, nodes)
// Add existing allocations
j1, j2 := mock.Job(), mock.Job()
alloc1 := &structs.Allocation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[0].Node.ID,
JobID: j1.ID,
Job: j1,
AllocatedResources: &structs.AllocatedResources{
Tasks: map[string]*structs.AllocatedTaskResources{
"web": {
Cpu: structs.AllocatedCpuResources{
CpuShares: 2048,
ReservedCores: []uint16{0, 1},
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: 2048,
},
},
},
},
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
TaskGroup: "web",
}
alloc2 := &structs.Allocation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
EvalID: uuid.Generate(),
NodeID: nodes[1].Node.ID,
JobID: j2.ID,
Job: j2,
AllocatedResources: &structs.AllocatedResources{
Tasks: map[string]*structs.AllocatedTaskResources{
"web": {
Cpu: structs.AllocatedCpuResources{
CpuShares: 1024,
ReservedCores: []uint16{0},
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: 1024,
},
},
},
},
DesiredStatus: structs.AllocDesiredStatusRun,
ClientStatus: structs.AllocClientStatusPending,
TaskGroup: "web",
}
require.NoError(t, state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID)))
require.NoError(t, state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)))
require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2}))
taskGroup := &structs.TaskGroup{
EphemeralDisk: &structs.EphemeralDisk{},
Tasks: []*structs.Task{
{
Name: "web",
Resources: &structs.Resources{
Cores: 1,
MemoryMB: 1024,
},
},
},
}
binp := NewBinPackIterator(ctx, static, false, 0, structs.SchedulerAlgorithmBinpack)
binp.SetTaskGroup(taskGroup)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
require := require.New(t)
require.Len(out, 1)
require.Equal(nodes[1].Node.ID, out[0].Node.ID)
require.Equal([]uint16{1}, out[0].TaskResources["web"].Cpu.ReservedCores)
}
func TestBinPackIterator_ExistingAlloc(t *testing.T) {
state, ctx := testContext(t)
nodes := []*RankedNode{