From 80faab0f791d5dd62b45593116ba428d39ecea71 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Fri, 26 Mar 2021 16:01:27 -0400 Subject: [PATCH] oversubscription: Add MemoryMaxMB to internal structs Start tracking a new MemoryMaxMB field that represents the maximum memory a task may use in the client. This allows tasks to specify a memory reservation (to be used by scheduler when placing the task) but use excess memory used on the client if the client has any. This commit adds the server tracking for the value, and ensures that allocations AllocatedResource fields include the value. --- nomad/fsm_test.go | 1 + nomad/plan_normalization_test.go | 2 +- nomad/structs/diff_test.go | 109 ++++++++++++++++++++++++ nomad/structs/funcs_test.go | 55 ++++++++++++ nomad/structs/structs.go | 54 +++++++++--- nomad/structs/structs_test.go | 73 ++++++++++++++-- scheduler/generic_sched_test.go | 138 ++++++++++++++++++++++++++++++- scheduler/rank.go | 3 +- scheduler/util.go | 2 + 9 files changed, 415 insertions(+), 22 deletions(-) diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index b4c73d5c0..4e75e5336 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -1399,6 +1399,7 @@ func TestFSM_UpsertAllocs_StrippedResources(t *testing.T) { // Resources should be recomputed origResources.DiskMB = alloc.Job.TaskGroups[0].EphemeralDisk.SizeMB + origResources.MemoryMaxMB = origResources.MemoryMB alloc.Resources = origResources if !reflect.DeepEqual(alloc, out) { t.Fatalf("not equal: % #v", pretty.Diff(alloc, out)) diff --git a/nomad/plan_normalization_test.go b/nomad/plan_normalization_test.go index ee478f8d4..2ccfaebc1 100644 --- a/nomad/plan_normalization_test.go +++ b/nomad/plan_normalization_test.go @@ -62,5 +62,5 @@ func TestPlanNormalize(t *testing.T) { } optimizedLogSize := buf.Len() - assert.True(t, float64(optimizedLogSize)/float64(unoptimizedLogSize) < 0.62) + assert.Less(t, float64(optimizedLogSize)/float64(unoptimizedLogSize), 0.63) } diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index 98f59ea81..51e307b79 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -4547,6 +4547,109 @@ func TestTaskDiff(t *testing.T) { Old: "100", New: "100", }, + { + Type: DiffTypeNone, + Name: "MemoryMaxMB", + Old: "0", + New: "0", + }, + }, + }, + }, + }, + }, + { + Name: "Resources edited memory_max", + Old: &Task{ + Resources: &Resources{ + CPU: 100, + MemoryMB: 100, + MemoryMaxMB: 200, + DiskMB: 100, + }, + }, + New: &Task{ + Resources: &Resources{ + CPU: 100, + MemoryMB: 100, + MemoryMaxMB: 300, + DiskMB: 100, + }, + }, + Expected: &TaskDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeEdited, + Name: "Resources", + Fields: []*FieldDiff{ + { + Type: DiffTypeEdited, + Name: "MemoryMaxMB", + Old: "200", + New: "300", + }, + }, + }, + }, + }, + }, + { + Name: "Resources edited memory_max with context", + Contextual: true, + Old: &Task{ + Resources: &Resources{ + CPU: 100, + MemoryMB: 100, + MemoryMaxMB: 200, + DiskMB: 100, + }, + }, + New: &Task{ + Resources: &Resources{ + CPU: 100, + MemoryMB: 100, + MemoryMaxMB: 300, + DiskMB: 100, + }, + }, + Expected: &TaskDiff{ + Type: DiffTypeEdited, + Objects: []*ObjectDiff{ + { + Type: DiffTypeEdited, + Name: "Resources", + Fields: []*FieldDiff{ + { + Type: DiffTypeNone, + Name: "CPU", + Old: "100", + New: "100", + }, + { + Type: DiffTypeNone, + Name: "DiskMB", + Old: "100", + New: "100", + }, + { + Type: DiffTypeNone, + Name: "IOPS", + Old: "0", + New: "0", + }, + { + Type: DiffTypeNone, + Name: "MemoryMB", + Old: "100", + New: "100", + }, + { + Type: DiffTypeEdited, + Name: "MemoryMaxMB", + Old: "200", + New: "300", + }, }, }, }, @@ -4906,6 +5009,12 @@ func TestTaskDiff(t *testing.T) { Old: "100", New: "100", }, + { + Type: DiffTypeNone, + Name: "MemoryMaxMB", + Old: "0", + New: "0", + }, }, Objects: []*ObjectDiff{ { diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index 13a18b2b7..988e5527b 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -552,6 +552,61 @@ func TestAllocsFit_Devices(t *testing.T) { require.True(fit) } +// TestAllocsFit_MemoryOversubscription asserts that only reserved memory is +// used for capacity +func TestAllocsFit_MemoryOversubscription(t *testing.T) { + n := &Node{ + NodeResources: &NodeResources{ + Cpu: NodeCpuResources{ + CpuShares: 2000, + }, + Memory: NodeMemoryResources{ + MemoryMB: 2048, + }, + }, + } + + a1 := &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{ + CpuShares: 100, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 1000, + MemoryMaxMB: 4000, + }, + }, + }, + }, + } + + // Should fit one allocation + fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) + require.NoError(t, err) + require.True(t, fit) + require.EqualValues(t, 100, used.Flattened.Cpu.CpuShares) + require.EqualValues(t, 1000, used.Flattened.Memory.MemoryMB) + require.EqualValues(t, 4000, used.Flattened.Memory.MemoryMaxMB) + + // Should fit second allocation + fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1}, nil, false) + require.NoError(t, err) + require.True(t, fit) + require.EqualValues(t, 200, used.Flattened.Cpu.CpuShares) + require.EqualValues(t, 2000, used.Flattened.Memory.MemoryMB) + require.EqualValues(t, 8000, used.Flattened.Memory.MemoryMaxMB) + + // Should not fit a third allocation + fit, _, used, err = AllocsFit(n, []*Allocation{a1, a1, a1}, nil, false) + require.NoError(t, err) + require.False(t, fit) + require.EqualValues(t, 300, used.Flattened.Cpu.CpuShares) + require.EqualValues(t, 3000, used.Flattened.Memory.MemoryMB) + require.EqualValues(t, 12000, used.Flattened.Memory.MemoryMaxMB) +} + // COMPAT(0.11): Remove in 0.11 func TestScoreFitBinPack_Old(t *testing.T) { node := &Node{} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a667b7750..09ea6de94 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2179,13 +2179,14 @@ type NodeStubFields struct { // Resources is used to define the resources available // on a client type Resources struct { - CPU int - Cores int - MemoryMB int - DiskMB int - IOPS int // COMPAT(0.10): Only being used to issue warnings - Networks Networks - Devices ResourceDevices + CPU int + Cores int + MemoryMB int + MemoryMaxMB int + DiskMB int + IOPS int // COMPAT(0.10): Only being used to issue warnings + Networks Networks + Devices ResourceDevices } const ( @@ -2244,6 +2245,10 @@ func (r *Resources) Validate() error { } } + if r.MemoryMaxMB != 0 && r.MemoryMaxMB < r.MemoryMB { + mErr.Errors = append(mErr.Errors, fmt.Errorf("MemoryMaxMB value (%d) should be larger than MemoryMB value (%d)", r.MemoryMaxMB, r.MemoryMB)) + } + return mErr.ErrorOrNil() } @@ -2259,6 +2264,9 @@ func (r *Resources) Merge(other *Resources) { if other.MemoryMB != 0 { r.MemoryMB = other.MemoryMB } + if other.MemoryMaxMB != 0 { + r.MemoryMaxMB = other.MemoryMaxMB + } if other.DiskMB != 0 { r.DiskMB = other.DiskMB } @@ -2281,6 +2289,7 @@ func (r *Resources) Equals(o *Resources) bool { return r.CPU == o.CPU && r.Cores == o.Cores && r.MemoryMB == o.MemoryMB && + r.MemoryMaxMB == o.MemoryMaxMB && r.DiskMB == o.DiskMB && r.IOPS == o.IOPS && r.Networks.Equals(&o.Networks) && @@ -2387,6 +2396,11 @@ func (r *Resources) Add(delta *Resources) { r.CPU += delta.CPU r.MemoryMB += delta.MemoryMB + if delta.MemoryMaxMB > 0 { + r.MemoryMaxMB += delta.MemoryMaxMB + } else { + r.MemoryMaxMB += delta.MemoryMB + } r.DiskMB += delta.DiskMB for _, n := range delta.Networks { @@ -3460,9 +3474,10 @@ func (a *AllocatedResources) OldTaskResources() map[string]*Resources { m := make(map[string]*Resources, len(a.Tasks)) for name, res := range a.Tasks { m[name] = &Resources{ - CPU: int(res.Cpu.CpuShares), - MemoryMB: int(res.Memory.MemoryMB), - Networks: res.Networks, + CPU: int(res.Cpu.CpuShares), + MemoryMB: int(res.Memory.MemoryMB), + MemoryMaxMB: int(res.Memory.MemoryMaxMB), + Networks: res.Networks, } } @@ -3589,7 +3604,8 @@ func (a *AllocatedTaskResources) Comparable() *ComparableResources { ReservedCores: a.Cpu.ReservedCores, }, Memory: AllocatedMemoryResources{ - MemoryMB: a.Memory.MemoryMB, + MemoryMB: a.Memory.MemoryMB, + MemoryMaxMB: a.Memory.MemoryMaxMB, }, }, } @@ -3709,7 +3725,8 @@ func (a *AllocatedCpuResources) Max(other *AllocatedCpuResources) { // AllocatedMemoryResources captures the allocated memory resources. type AllocatedMemoryResources struct { - MemoryMB int64 + MemoryMB int64 + MemoryMaxMB int64 } func (a *AllocatedMemoryResources) Add(delta *AllocatedMemoryResources) { @@ -3718,6 +3735,11 @@ func (a *AllocatedMemoryResources) Add(delta *AllocatedMemoryResources) { } a.MemoryMB += delta.MemoryMB + if delta.MemoryMaxMB != 0 { + a.MemoryMaxMB += delta.MemoryMaxMB + } else { + a.MemoryMaxMB += delta.MemoryMB + } } func (a *AllocatedMemoryResources) Subtract(delta *AllocatedMemoryResources) { @@ -3726,6 +3748,11 @@ func (a *AllocatedMemoryResources) Subtract(delta *AllocatedMemoryResources) { } a.MemoryMB -= delta.MemoryMB + if delta.MemoryMaxMB != 0 { + a.MemoryMaxMB -= delta.MemoryMaxMB + } else { + a.MemoryMaxMB -= delta.MemoryMB + } } func (a *AllocatedMemoryResources) Max(other *AllocatedMemoryResources) { @@ -3736,6 +3763,9 @@ func (a *AllocatedMemoryResources) Max(other *AllocatedMemoryResources) { if other.MemoryMB > a.MemoryMB { a.MemoryMB = other.MemoryMB } + if other.MemoryMaxMB > a.MemoryMaxMB { + a.MemoryMaxMB = other.MemoryMaxMB + } } type AllocatedDevices []*AllocatedDeviceResource diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index 4132796a0..be61264d9 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1474,6 +1474,7 @@ func TestTask_Validate_Resources(t *testing.T) { cases := []struct { name string res *Resources + err string }{ { name: "Minimum", @@ -1486,9 +1487,10 @@ func TestTask_Validate_Resources(t *testing.T) { { name: "Full", res: &Resources{ - CPU: 1000, - MemoryMB: 1000, - IOPS: 1000, + CPU: 1000, + MemoryMB: 1000, + MemoryMaxMB: 2000, + IOPS: 1000, Networks: []*NetworkResource{ { Mode: "host", @@ -1521,12 +1523,43 @@ func TestTask_Validate_Resources(t *testing.T) { }, }, }, + { + name: "too little cpu", + res: &Resources{ + CPU: 0, + MemoryMB: 200, + }, + err: "minimum CPU value is 1", + }, + { + name: "too little memory", + res: &Resources{ + CPU: 100, + MemoryMB: 1, + }, + err: "minimum MemoryMB value is 10; got 1", + }, + { + name: "too little memory max", + res: &Resources{ + CPU: 100, + MemoryMB: 200, + MemoryMaxMB: 10, + }, + err: "MemoryMaxMB value (10) should be larger than MemoryMB value (200", + }, } for i := range cases { tc := cases[i] t.Run(tc.name, func(t *testing.T) { - require.NoError(t, tc.res.Validate()) + err := tc.res.Validate() + if tc.err == "" { + require.NoError(t, err) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), tc.err) + } }) } } @@ -2711,7 +2744,8 @@ func TestComparableResources_Subtract(t *testing.T) { ReservedCores: []uint16{0, 1}, }, Memory: AllocatedMemoryResources{ - MemoryMB: 2048, + MemoryMB: 2048, + MemoryMaxMB: 3048, }, Networks: []*NetworkResource{ { @@ -2733,7 +2767,8 @@ func TestComparableResources_Subtract(t *testing.T) { ReservedCores: []uint16{0}, }, Memory: AllocatedMemoryResources{ - MemoryMB: 1024, + MemoryMB: 1024, + MemoryMaxMB: 1524, }, Networks: []*NetworkResource{ { @@ -2756,7 +2791,8 @@ func TestComparableResources_Subtract(t *testing.T) { ReservedCores: []uint16{1}, }, Memory: AllocatedMemoryResources{ - MemoryMB: 1024, + MemoryMB: 1024, + MemoryMaxMB: 1524, }, Networks: []*NetworkResource{ { @@ -2775,6 +2811,29 @@ func TestComparableResources_Subtract(t *testing.T) { require.Equal(expect, r1) } +func TestMemoryResources_Add(t *testing.T) { + r := &AllocatedMemoryResources{} + + // adding plain no max + r.Add(&AllocatedMemoryResources{ + MemoryMB: 100, + }) + require.Equal(t, &AllocatedMemoryResources{ + MemoryMB: 100, + MemoryMaxMB: 100, + }, r) + + // adding with max + r.Add(&AllocatedMemoryResources{ + MemoryMB: 100, + MemoryMaxMB: 200, + }) + require.Equal(t, &AllocatedMemoryResources{ + MemoryMB: 200, + MemoryMaxMB: 300, + }, r) +} + func TestEncodeDecode(t *testing.T) { type FooRequest struct { Foo string diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go index 7c5f7cd7a..92c3f7787 100644 --- a/scheduler/generic_sched_test.go +++ b/scheduler/generic_sched_test.go @@ -105,6 +105,97 @@ func TestServiceSched_JobRegister(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } +func TestServiceSched_JobRegister_MemoryMaxHonored(t *testing.T) { + + cases := []struct { + name string + cpu int + memory int + memoryMax int + + // expectedTotalMemoryMax should be SUM(MAX(memory, memoryMax)) for all tasks + expectedTotalMemoryMax int + }{ + { + name: "plain no max", + cpu: 100, + memory: 200, + memoryMax: 0, + expectedTotalMemoryMax: 200, + }, + { + name: "with max", + cpu: 100, + memory: 200, + memoryMax: 300, + expectedTotalMemoryMax: 300, + }, + } + + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + job := mock.Job() + job.TaskGroups[0].Count = 1 + + task := job.TaskGroups[0].Tasks[0].Name + res := job.TaskGroups[0].Tasks[0].Resources + res.CPU = c.cpu + res.MemoryMB = c.memory + res.MemoryMaxMB = c.memoryMax + + h := NewHarness(t) + + // Create some nodes + for i := 0; i < 10; i++ { + node := mock.Node() + require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + } + require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + + require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + require.NoError(t, err) + + require.Len(t, h.Plans, 1) + + out, err := h.State.AllocsByJob(nil, job.Namespace, job.ID, false) + require.NoError(t, err) + + // Ensure all allocations placed + require.Len(t, out, 1) + alloc := out[0] + + // checking new resources field deprecated Resources fields + require.Equal(t, int64(c.cpu), alloc.AllocatedResources.Tasks[task].Cpu.CpuShares) + require.Equal(t, int64(c.memory), alloc.AllocatedResources.Tasks[task].Memory.MemoryMB) + require.Equal(t, int64(c.memoryMax), alloc.AllocatedResources.Tasks[task].Memory.MemoryMaxMB) + + // checking old deprecated Resources fields + require.Equal(t, c.cpu, alloc.TaskResources[task].CPU) + require.Equal(t, c.memory, alloc.TaskResources[task].MemoryMB) + require.Equal(t, c.memoryMax, alloc.TaskResources[task].MemoryMaxMB) + + // check total resource fields - alloc.Resources deprecated field, no modern equivalent + require.Equal(t, c.cpu, alloc.Resources.CPU) + require.Equal(t, c.memory, alloc.Resources.MemoryMB) + require.Equal(t, c.expectedTotalMemoryMax, alloc.Resources.MemoryMaxMB) + + }) + } +} + func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) { h := NewHarness(t) @@ -4543,7 +4634,7 @@ func TestBatchSched_ScaleDown_SameName(t *testing.T) { h.AssertEvalStatus(t, structs.EvalStatusComplete) } -func TestGenericSched_AllocFit(t *testing.T) { +func TestGenericSched_AllocFit_Lifecycle(t *testing.T) { testCases := []struct { Name string NodeCpu int64 @@ -4661,6 +4752,51 @@ func TestGenericSched_AllocFit(t *testing.T) { } } +func TestGenericSched_AllocFit_MemoryOversubscription(t *testing.T) { + h := NewHarness(t) + node := mock.Node() + node.NodeResources.Cpu.CpuShares = 10000 + node.NodeResources.Memory.MemoryMB = 1224 + node.ReservedResources.Memory.MemoryMB = 60 + require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node)) + + job := mock.Job() + job.TaskGroups[0].Count = 10 + job.TaskGroups[0].Tasks[0].Resources.CPU = 100 + job.TaskGroups[0].Tasks[0].Resources.MemoryMB = 200 + job.TaskGroups[0].Tasks[0].Resources.MemoryMaxMB = 500 + job.TaskGroups[0].Tasks[0].Resources.DiskMB = 1 + require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), job)) + + // Create a mock evaluation to register the job + eval := &structs.Evaluation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + Priority: job.Priority, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: job.ID, + Status: structs.EvalStatusPending, + } + require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval})) + + // Process the evaluation + err := h.Process(NewServiceScheduler, eval) + require.NoError(t, err) + + // expectedAllocs should be floor((nodeResources.MemoryMB-reservedResources.MemoryMB) / job.MemoryMB) + expectedAllocs := 5 + require.Len(t, h.Plans, 1) + + // Lookup the allocations by JobID + ws := memdb.NewWatchSet() + out, err := h.State.AllocsByJob(ws, job.Namespace, job.ID, false) + require.NoError(t, err) + + require.Len(t, out, expectedAllocs) + + h.AssertEvalStatus(t, structs.EvalStatusComplete) +} + func TestGenericSched_ChainedAlloc(t *testing.T) { h := NewHarness(t) diff --git a/scheduler/rank.go b/scheduler/rank.go index 6ac93bfda..7cb7889e3 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -304,7 +304,8 @@ OUTER: CpuShares: int64(task.Resources.CPU), }, Memory: structs.AllocatedMemoryResources{ - MemoryMB: int64(task.Resources.MemoryMB), + MemoryMB: int64(task.Resources.MemoryMB), + MemoryMaxMB: int64(task.Resources.MemoryMaxMB), }, } diff --git a/scheduler/util.go b/scheduler/util.go index 2b3ded929..e144fd42e 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -429,6 +429,8 @@ func tasksUpdated(jobA, jobB *structs.Job, taskGroup string) bool { return true } else if ar.MemoryMB != br.MemoryMB { return true + } else if ar.MemoryMaxMB != br.MemoryMaxMB { + return true } else if !ar.Devices.Equals(&br.Devices) { return true }