oversubscription: Add MemoryMaxMB to internal structs

Start tracking a new MemoryMaxMB field that represents the maximum memory a task
may use in the client. This allows tasks to specify a memory reservation (to be
used by scheduler when placing the task) but use excess memory used on the
client if the client has any.

This commit adds the server tracking for the value, and ensures that allocations
AllocatedResource fields include the value.
This commit is contained in:
Mahmood Ali
2021-03-26 16:01:27 -04:00
parent 14568b3e00
commit 80faab0f79
9 changed files with 415 additions and 22 deletions

View File

@@ -1399,6 +1399,7 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) {
// Resources should be recomputed
origResources.DiskMB = alloc.Job.TaskGroups[0].EphemeralDisk.SizeMB
origResources.MemoryMaxMB = origResources.MemoryMB
alloc.Resources = origResources
if !reflect.DeepEqual(alloc, out) {
t.Fatalf("not equal: % #v", pretty.Diff(alloc, out))

View File

@@ -62,5 +62,5 @@ func TestPlanNormalize(t *testing.T) {
}
optimizedLogSize := buf.Len()
assert.True(t, float64(optimizedLogSize)/float64(unoptimizedLogSize) < 0.62)
assert.Less(t, float64(optimizedLogSize)/float64(unoptimizedLogSize), 0.63)
}

View File

@@ -4547,6 +4547,109 @@ func TestTaskDiff(t *testing.T) {
Old: "100",
New: "100",
},
{
Type: DiffTypeNone,
Name: "MemoryMaxMB",
Old: "0",
New: "0",
},
},
},
},
},
},
{
Name: "Resources edited memory_max",
Old: &Task{
Resources: &Resources{
CPU: 100,
MemoryMB: 100,
MemoryMaxMB: 200,
DiskMB: 100,
},
},
New: &Task{
Resources: &Resources{
CPU: 100,
MemoryMB: 100,
MemoryMaxMB: 300,
DiskMB: 100,
},
},
Expected: &TaskDiff{
Type: DiffTypeEdited,
Objects: []*ObjectDiff{
{
Type: DiffTypeEdited,
Name: "Resources",
Fields: []*FieldDiff{
{
Type: DiffTypeEdited,
Name: "MemoryMaxMB",
Old: "200",
New: "300",
},
},
},
},
},
},
{
Name: "Resources edited memory_max with context",
Contextual: true,
Old: &Task{
Resources: &Resources{
CPU: 100,
MemoryMB: 100,
MemoryMaxMB: 200,
DiskMB: 100,
},
},
New: &Task{
Resources: &Resources{
CPU: 100,
MemoryMB: 100,
MemoryMaxMB: 300,
DiskMB: 100,
},
},
Expected: &TaskDiff{
Type: DiffTypeEdited,
Objects: []*ObjectDiff{
{
Type: DiffTypeEdited,
Name: "Resources",
Fields: []*FieldDiff{
{
Type: DiffTypeNone,
Name: "CPU",
Old: "100",
New: "100",
},
{
Type: DiffTypeNone,
Name: "DiskMB",
Old: "100",
New: "100",
},
{
Type: DiffTypeNone,
Name: "IOPS",
Old: "0",
New: "0",
},
{
Type: DiffTypeNone,
Name: "MemoryMB",
Old: "100",
New: "100",
},
{
Type: DiffTypeEdited,
Name: "MemoryMaxMB",
Old: "200",
New: "300",
},
},
},
},
@@ -4906,6 +5009,12 @@ func TestTaskDiff(t *testing.T) {
Old: "100",
New: "100",
},
{
Type: DiffTypeNone,
Name: "MemoryMaxMB",
Old: "0",
New: "0",
},
},
Objects: []*ObjectDiff{
{

View File

@@ -552,6 +552,61 @@ func TestAllocsFit_Devices(t *testing.T) {
require.True(fit)
}
// TestAllocsFit_MemoryOversubscription asserts that only reserved memory is
// used for capacity
func TestAllocsFit_MemoryOversubscription(t *testing.T) {
n := &Node{
NodeResources: &NodeResources{
Cpu: NodeCpuResources{
CpuShares: 2000,
},
Memory: NodeMemoryResources{
MemoryMB: 2048,
},
},
}
a1 := &Allocation{
AllocatedResources: &AllocatedResources{
Tasks: map[string]*AllocatedTaskResources{
"web": {
Cpu: AllocatedCpuResources{
CpuShares: 100,
},
Memory: AllocatedMemoryResources{
MemoryMB: 1000,
MemoryMaxMB: 4000,
},
},
},
},
}
// Should fit one allocation
fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false)
require.NoError(t, err)
require.True(t, fit)
require.EqualValues(t, 100, used.Flattened.Cpu.CpuShares)
require.EqualValues(t, 1000, used.Flattened.Memory.MemoryMB)
require.EqualValues(t, 4000, used.Flattened.Memory.MemoryMaxMB)
// Should fit second allocation
fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false)
require.NoError(t, err)
require.True(t, fit)
require.EqualValues(t, 200, used.Flattened.Cpu.CpuShares)
require.EqualValues(t, 2000, used.Flattened.Memory.MemoryMB)
require.EqualValues(t, 8000, used.Flattened.Memory.MemoryMaxMB)
// Should not fit a third allocation
fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1, a1}, nil, false)
require.NoError(t, err)
require.False(t, fit)
require.EqualValues(t, 300, used.Flattened.Cpu.CpuShares)
require.EqualValues(t, 3000, used.Flattened.Memory.MemoryMB)
require.EqualValues(t, 12000, used.Flattened.Memory.MemoryMaxMB)
}
// COMPAT(0.11): Remove in 0.11
func TestScoreFitBinPack_Old(t *testing.T) {
node := &Node{}

View File

@@ -2179,13 +2179,14 @@ type NodeStubFields struct {
// Resources is used to define the resources available
// on a client
type Resources struct {
CPU int
Cores int
MemoryMB int
DiskMB int
IOPS int // COMPAT(0.10): Only being used to issue warnings
Networks Networks
Devices ResourceDevices
CPU int
Cores int
MemoryMB int
MemoryMaxMB int
DiskMB int
IOPS int // COMPAT(0.10): Only being used to issue warnings
Networks Networks
Devices ResourceDevices
}
const (
@@ -2244,6 +2245,10 @@ func (r *Resources) Validate() error {
}
}
if r.MemoryMaxMB != 0 && r.MemoryMaxMB < r.MemoryMB {
mErr.Errors = append(mErr.Errors, fmt.Errorf("MemoryMaxMB value (%d) should be larger than MemoryMB value (%d)", r.MemoryMaxMB, r.MemoryMB))
}
return mErr.ErrorOrNil()
}
@@ -2259,6 +2264,9 @@ func (r *Resources) Merge(other *Resources) {
if other.MemoryMB != 0 {
r.MemoryMB = other.MemoryMB
}
if other.MemoryMaxMB != 0 {
r.MemoryMaxMB = other.MemoryMaxMB
}
if other.DiskMB != 0 {
r.DiskMB = other.DiskMB
}
@@ -2281,6 +2289,7 @@ func (r *Resources) Equals(o *Resources) bool {
return r.CPU == o.CPU &&
r.Cores == o.Cores &&
r.MemoryMB == o.MemoryMB &&
r.MemoryMaxMB == o.MemoryMaxMB &&
r.DiskMB == o.DiskMB &&
r.IOPS == o.IOPS &&
r.Networks.Equals(&o.Networks) &&
@@ -2387,6 +2396,11 @@ func (r *Resources) Add(delta *Resources) {
r.CPU += delta.CPU
r.MemoryMB += delta.MemoryMB
if delta.MemoryMaxMB > 0 {
r.MemoryMaxMB += delta.MemoryMaxMB
} else {
r.MemoryMaxMB += delta.MemoryMB
}
r.DiskMB += delta.DiskMB
for _, n := range delta.Networks {
@@ -3460,9 +3474,10 @@ func (a *AllocatedResources) OldTaskResources() map[string]*Resources {
m := make(map[string]*Resources, len(a.Tasks))
for name, res := range a.Tasks {
m[name] = &Resources{
CPU: int(res.Cpu.CpuShares),
MemoryMB: int(res.Memory.MemoryMB),
Networks: res.Networks,
CPU: int(res.Cpu.CpuShares),
MemoryMB: int(res.Memory.MemoryMB),
MemoryMaxMB: int(res.Memory.MemoryMaxMB),
Networks: res.Networks,
}
}
@@ -3589,7 +3604,8 @@ func (a *AllocatedTaskResources) Comparable() *ComparableResources {
ReservedCores: a.Cpu.ReservedCores,
},
Memory: AllocatedMemoryResources{
MemoryMB: a.Memory.MemoryMB,
MemoryMB: a.Memory.MemoryMB,
MemoryMaxMB: a.Memory.MemoryMaxMB,
},
},
}
@@ -3709,7 +3725,8 @@ func (a *AllocatedCpuResources) Max(other *AllocatedCpuResources) {
// AllocatedMemoryResources captures the allocated memory resources.
type AllocatedMemoryResources struct {
MemoryMB int64
MemoryMB int64
MemoryMaxMB int64
}
func (a *AllocatedMemoryResources) Add(delta *AllocatedMemoryResources) {
@@ -3718,6 +3735,11 @@ func (a *AllocatedMemoryResources) Add(delta *AllocatedMemoryResources) {
}
a.MemoryMB += delta.MemoryMB
if delta.MemoryMaxMB != 0 {
a.MemoryMaxMB += delta.MemoryMaxMB
} else {
a.MemoryMaxMB += delta.MemoryMB
}
}
func (a *AllocatedMemoryResources) Subtract(delta *AllocatedMemoryResources) {
@@ -3726,6 +3748,11 @@ func (a *AllocatedMemoryResources) Subtract(delta *AllocatedMemoryResources) {
}
a.MemoryMB -= delta.MemoryMB
if delta.MemoryMaxMB != 0 {
a.MemoryMaxMB -= delta.MemoryMaxMB
} else {
a.MemoryMaxMB -= delta.MemoryMB
}
}
func (a *AllocatedMemoryResources) Max(other *AllocatedMemoryResources) {
@@ -3736,6 +3763,9 @@ func (a *AllocatedMemoryResources) Max(other *AllocatedMemoryResources) {
if other.MemoryMB > a.MemoryMB {
a.MemoryMB = other.MemoryMB
}
if other.MemoryMaxMB > a.MemoryMaxMB {
a.MemoryMaxMB = other.MemoryMaxMB
}
}
type AllocatedDevices []*AllocatedDeviceResource

View File

@@ -1474,6 +1474,7 @@ func TestTask_Validate_Resources(t *testing.T) {
cases := []struct {
name string
res *Resources
err string
}{
{
name: "Minimum",
@@ -1486,9 +1487,10 @@ func TestTask_Validate_Resources(t *testing.T) {
{
name: "Full",
res: &Resources{
CPU: 1000,
MemoryMB: 1000,
IOPS: 1000,
CPU: 1000,
MemoryMB: 1000,
MemoryMaxMB: 2000,
IOPS: 1000,
Networks: []*NetworkResource{
{
Mode: "host",
@@ -1521,12 +1523,43 @@ func TestTask_Validate_Resources(t *testing.T) {
},
},
},
{
name: "too little cpu",
res: &Resources{
CPU: 0,
MemoryMB: 200,
},
err: "minimum CPU value is 1",
},
{
name: "too little memory",
res: &Resources{
CPU: 100,
MemoryMB: 1,
},
err: "minimum MemoryMB value is 10; got 1",
},
{
name: "too little memory max",
res: &Resources{
CPU: 100,
MemoryMB: 200,
MemoryMaxMB: 10,
},
err: "MemoryMaxMB value (10) should be larger than MemoryMB value (200",
},
}
for i := range cases {
tc := cases[i]
t.Run(tc.name, func(t *testing.T) {
require.NoError(t, tc.res.Validate())
err := tc.res.Validate()
if tc.err == "" {
require.NoError(t, err)
} else {
require.Error(t, err)
require.Contains(t, err.Error(), tc.err)
}
})
}
}
@@ -2711,7 +2744,8 @@ func TestComparableResources_Subtract(t *testing.T) {
ReservedCores: []uint16{0, 1},
},
Memory: AllocatedMemoryResources{
MemoryMB: 2048,
MemoryMB: 2048,
MemoryMaxMB: 3048,
},
Networks: []*NetworkResource{
{
@@ -2733,7 +2767,8 @@ func TestComparableResources_Subtract(t *testing.T) {
ReservedCores: []uint16{0},
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
MemoryMB: 1024,
MemoryMaxMB: 1524,
},
Networks: []*NetworkResource{
{
@@ -2756,7 +2791,8 @@ func TestComparableResources_Subtract(t *testing.T) {
ReservedCores: []uint16{1},
},
Memory: AllocatedMemoryResources{
MemoryMB: 1024,
MemoryMB: 1024,
MemoryMaxMB: 1524,
},
Networks: []*NetworkResource{
{
@@ -2775,6 +2811,29 @@ func TestComparableResources_Subtract(t *testing.T) {
require.Equal(expect, r1)
}
func TestMemoryResources_Add(t *testing.T) {
r := &AllocatedMemoryResources{}
// adding plain no max
r.Add(&AllocatedMemoryResources{
MemoryMB: 100,
})
require.Equal(t, &AllocatedMemoryResources{
MemoryMB: 100,
MemoryMaxMB: 100,
}, r)
// adding with max
r.Add(&AllocatedMemoryResources{
MemoryMB: 100,
MemoryMaxMB: 200,
})
require.Equal(t, &AllocatedMemoryResources{
MemoryMB: 200,
MemoryMaxMB: 300,
}, r)
}
func TestEncodeDecode(t *testing.T) {
type FooRequest struct {
Foo string

View File

@@ -105,6 +105,97 @@ func TestServiceSched_JobRegister(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestServiceSched_JobRegister_MemoryMaxHonored(t *testing.T) {
cases := []struct {
name string
cpu int
memory int
memoryMax int
// expectedTotalMemoryMax should be SUM(MAX(memory, memoryMax)) for all tasks
expectedTotalMemoryMax int
}{
{
name: "plain no max",
cpu: 100,
memory: 200,
memoryMax: 0,
expectedTotalMemoryMax: 200,
},
{
name: "with max",
cpu: 100,
memory: 200,
memoryMax: 300,
expectedTotalMemoryMax: 300,
},
}
for _, c := range cases {
t.Run(c.name, func(t *testing.T) {
job := mock.Job()
job.TaskGroups[0].Count = 1
task := job.TaskGroups[0].Tasks[0].Name
res := job.TaskGroups[0].Tasks[0].Resources
res.CPU = c.cpu
res.MemoryMB = c.memory
res.MemoryMaxMB = c.memoryMax
h := NewHarness(t)
// Create some nodes
for i := 0; i < 10; i++ {
node := mock.Node()
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
}
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
require.NoError(t, err)
require.Len(t, h.Plans, 1)
out, err := h.State.AllocsByJob(nil, job.Namespace, job.ID, false)
require.NoError(t, err)
// Ensure all allocations placed
require.Len(t, out, 1)
alloc := out[0]
// checking new resources field deprecated Resources fields
require.Equal(t, int64(c.cpu), alloc.AllocatedResources.Tasks[task].Cpu.CpuShares)
require.Equal(t, int64(c.memory), alloc.AllocatedResources.Tasks[task].Memory.MemoryMB)
require.Equal(t, int64(c.memoryMax), alloc.AllocatedResources.Tasks[task].Memory.MemoryMaxMB)
// checking old deprecated Resources fields
require.Equal(t, c.cpu, alloc.TaskResources[task].CPU)
require.Equal(t, c.memory, alloc.TaskResources[task].MemoryMB)
require.Equal(t, c.memoryMax, alloc.TaskResources[task].MemoryMaxMB)
// check total resource fields - alloc.Resources deprecated field, no modern equivalent
require.Equal(t, c.cpu, alloc.Resources.CPU)
require.Equal(t, c.memory, alloc.Resources.MemoryMB)
require.Equal(t, c.expectedTotalMemoryMax, alloc.Resources.MemoryMaxMB)
})
}
}
func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) {
h := NewHarness(t)
@@ -4543,7 +4634,7 @@ func TestBatchSched_ScaleDown_SameName(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestGenericSched_AllocFit(t *testing.T) {
func TestGenericSched_AllocFit_Lifecycle(t *testing.T) {
testCases := []struct {
Name string
NodeCpu int64
@@ -4661,6 +4752,51 @@ func TestGenericSched_AllocFit(t *testing.T) {
}
}
func TestGenericSched_AllocFit_MemoryOversubscription(t *testing.T) {
h := NewHarness(t)
node := mock.Node()
node.NodeResources.Cpu.CpuShares = 10000
node.NodeResources.Memory.MemoryMB = 1224
node.ReservedResources.Memory.MemoryMB = 60
require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
job := mock.Job()
job.TaskGroups[0].Count = 10
job.TaskGroups[0].Tasks[0].Resources.CPU = 100
job.TaskGroups[0].Tasks[0].Resources.MemoryMB = 200
job.TaskGroups[0].Tasks[0].Resources.MemoryMaxMB = 500
job.TaskGroups[0].Tasks[0].Resources.DiskMB = 1
require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job))
// Create a mock evaluation to register the job
eval := &structs.Evaluation{
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
Priority: job.Priority,
TriggeredBy: structs.EvalTriggerJobRegister,
JobID: job.ID,
Status: structs.EvalStatusPending,
}
require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
// Process the evaluation
err := h.Process(NewServiceScheduler, eval)
require.NoError(t, err)
// expectedAllocs should be floor((nodeResources.MemoryMB-reservedResources.MemoryMB) / job.MemoryMB)
expectedAllocs := 5
require.Len(t, h.Plans, 1)
// Lookup the allocations by JobID
ws := memdb.NewWatchSet()
out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false)
require.NoError(t, err)
require.Len(t, out, expectedAllocs)
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
func TestGenericSched_ChainedAlloc(t *testing.T) {
h := NewHarness(t)

View File

@@ -304,7 +304,8 @@ OUTER:
CpuShares: int64(task.Resources.CPU),
},
Memory: structs.AllocatedMemoryResources{
MemoryMB: int64(task.Resources.MemoryMB),
MemoryMB: int64(task.Resources.MemoryMB),
MemoryMaxMB: int64(task.Resources.MemoryMaxMB),
},
}

View File

@@ -429,6 +429,8 @@ func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) bool {
return true
} else if ar.MemoryMB != br.MemoryMB {
return true
} else if ar.MemoryMaxMB != br.MemoryMaxMB {
return true
} else if !ar.Devices.Equals(&br.Devices) {
return true
}