diff --git a/lib/cpuset/cpuset.go b/lib/cpuset/cpuset.go index c97f369dc..4f03d3f74 100644 --- a/lib/cpuset/cpuset.go +++ b/lib/cpuset/cpuset.go @@ -79,6 +79,15 @@ func (s CPUSet) IsSubsetOf(other CPUSet) bool { return true } +func (s CPUSet) IsSupersetOf(other CPUSet) bool { + for cpu := range other.cpus { + if _, ok := s.cpus[cpu]; !ok { + return false + } + } + return true +} + // Equals tests the equality of the elements in the CPUSet func (s CPUSet) Equals(other CPUSet) bool { return reflect.DeepEqual(s.cpus, other.cpus) diff --git a/lib/cpuset/cpuset_test.go b/lib/cpuset/cpuset_test.go index f3c58afb2..bd1bc1f7d 100644 --- a/lib/cpuset/cpuset_test.go +++ b/lib/cpuset/cpuset_test.go @@ -63,7 +63,7 @@ func TestCPUSet_Equals(t *testing.T) { } } -func TestCpuSet_Union(t *testing.T) { +func TestCPUSet_Union(t *testing.T) { cases := []struct { a CPUSet b CPUSet @@ -86,7 +86,7 @@ func TestCpuSet_Union(t *testing.T) { } } -func TestCpuSet_Difference(t *testing.T) { +func TestCPUSet_Difference(t *testing.T) { cases := []struct { a CPUSet b CPUSet @@ -108,6 +108,44 @@ func TestCpuSet_Difference(t *testing.T) { } } +func TestCPUSet_IsSubsetOf(t *testing.T) { + cases := []struct { + a CPUSet + b CPUSet + isSubset bool + }{ + {New(0), New(0), true}, + {New(), New(0), true}, + {New(0), New(), false}, + {New(1, 2), New(0, 1, 2, 3), true}, + {New(2, 1), New(0, 1, 2, 3), true}, + {New(3, 4), New(0, 1, 2, 3), false}, + } + + for _, c := range cases { + require.Equal(t, c.isSubset, c.a.IsSubsetOf(c.b)) + } +} + +func TestCPUSet_IsSupersetOf(t *testing.T) { + cases := []struct { + a CPUSet + b CPUSet + isSuperset bool + }{ + {New(0), New(0), true}, + {New(0), New(), true}, + {New(), New(0), false}, + {New(0, 1, 2, 3), New(0), true}, + {New(0, 1, 2, 3), New(2, 3), true}, + {New(0, 1, 2, 3), New(2, 3, 4), false}, + } + + for _, c := range cases { + require.Equal(t, c.isSuperset, c.a.IsSupersetOf(c.b)) + } +} + func TestParse(t *testing.T) { cases := []struct { cpuset string diff --git a/nomad/structs/funcs.go b/nomad/structs/funcs.go index 0f6e5e70a..ad151bad7 100644 --- a/nomad/structs/funcs.go +++ b/nomad/structs/funcs.go @@ -98,6 +98,9 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi // Compute the allocs' utilization from zero used := new(ComparableResources) + reservedCores := map[uint16]struct{}{} + var coreOverlap bool + // For each alloc, add the resources for _, alloc := range allocs { // Do not consider the resource impact of terminal allocations @@ -105,7 +108,21 @@ func AllocsFit(node *Node, allocs []*Allocation, netIdx *NetworkIndex, checkDevi continue } - used.Add(alloc.ComparableResources()) + cr := alloc.ComparableResources() + used.Add(cr) + + // Adding the comparable resource unions reserved core sets, need to check if reserved cores overlap + for _, core := range cr.Flattened.Cpu.ReservedCores { + if _, ok := reservedCores[core]; ok { + coreOverlap = true + } else { + reservedCores[core] = struct{}{} + } + } + } + + if coreOverlap { + return false, "cores", used, nil } // Check that the node resources (after subtracting reserved) are a diff --git a/nomad/structs/funcs_test.go b/nomad/structs/funcs_test.go index 1b3eb346c..13a18b2b7 100644 --- a/nomad/structs/funcs_test.go +++ b/nomad/structs/funcs_test.go @@ -269,7 +269,9 @@ func TestAllocsFit(t *testing.T) { n := &Node{ NodeResources: &NodeResources{ Cpu: NodeCpuResources{ - CpuShares: 2000, + CpuShares: 2000, + TotalCpuCores: 2, + ReservableCpuCores: []uint16{0, 1}, }, Memory: NodeMemoryResources{ MemoryMB: 2048, @@ -317,7 +319,8 @@ func TestAllocsFit(t *testing.T) { Tasks: map[string]*AllocatedTaskResources{ "web": { Cpu: AllocatedCpuResources{ - CpuShares: 1000, + CpuShares: 1000, + ReservedCores: []uint16{}, }, Memory: AllocatedMemoryResources{ MemoryMB: 1024, @@ -345,9 +348,9 @@ func TestAllocsFit(t *testing.T) { } // Should fit one allocation - fit, _, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) + fit, dim, used, err := AllocsFit(n, []*Allocation{a1}, nil, false) require.NoError(err) - require.True(fit) + require.True(fit, "failed for dimension %q", dim) require.EqualValues(1000, used.Flattened.Cpu.CpuShares) require.EqualValues(1024, used.Flattened.Memory.MemoryMB) @@ -357,6 +360,48 @@ func TestAllocsFit(t *testing.T) { require.False(fit) require.EqualValues(2000, used.Flattened.Cpu.CpuShares) require.EqualValues(2048, used.Flattened.Memory.MemoryMB) + + a2 := &Allocation{ + AllocatedResources: &AllocatedResources{ + Tasks: map[string]*AllocatedTaskResources{ + "web": { + Cpu: AllocatedCpuResources{ + CpuShares: 500, + ReservedCores: []uint16{0}, + }, + Memory: AllocatedMemoryResources{ + MemoryMB: 512, + }, + }, + }, + Shared: AllocatedSharedResources{ + DiskMB: 1000, + Networks: Networks{ + { + Mode: "host", + IP: "10.0.0.1", + }, + }, + }, + }, + } + + // Should fit one allocation + fit, dim, used, err = AllocsFit(n, []*Allocation{a2}, nil, false) + require.NoError(err) + require.True(fit, "failed for dimension %q", dim) + require.EqualValues(500, used.Flattened.Cpu.CpuShares) + require.EqualValues([]uint16{0}, used.Flattened.Cpu.ReservedCores) + require.EqualValues(512, used.Flattened.Memory.MemoryMB) + + // Should not fit second allocation + fit, dim, used, err = AllocsFit(n, []*Allocation{a2, a2}, nil, false) + require.NoError(err) + require.False(fit) + require.EqualValues("cores", dim) + require.EqualValues(1000, used.Flattened.Cpu.CpuShares) + require.EqualValues([]uint16{0}, used.Flattened.Cpu.ReservedCores) + require.EqualValues(1024, used.Flattened.Memory.MemoryMB) } func TestAllocsFit_TerminalAlloc(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index a29b62982..9e86a20bd 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3844,7 +3844,8 @@ func (c *ComparableResources) Superset(other *ComparableResources) (bool, string if c.Flattened.Cpu.CpuShares < other.Flattened.Cpu.CpuShares { return false, "cpu" } - if len(c.Flattened.Cpu.ReservedCores) > 0 && cpuset.New(other.Flattened.Cpu.ReservedCores...).IsSubsetOf(cpuset.New(c.Flattened.Cpu.ReservedCores...)) { + + if len(c.Flattened.Cpu.ReservedCores) > 0 && !cpuset.New(c.Flattened.Cpu.ReservedCores...).IsSupersetOf(cpuset.New(other.Flattened.Cpu.ReservedCores...)) { return false, "cores" } if c.Flattened.Memory.MemoryMB < other.Flattened.Memory.MemoryMB { diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index dd5ea3bf5..30ec486bd 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -6012,6 +6012,69 @@ func TestTaskGroup_validateScriptChecksInGroupServices(t *testing.T) { }) } +func TestComparableResources_Superset(t *testing.T) { + base := &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{ + CpuShares: 4000, + ReservedCores: []uint16{0, 1, 2, 3}, + }, + Memory: AllocatedMemoryResources{MemoryMB: 4096}, + }, + Shared: AllocatedSharedResources{DiskMB: 10000}, + } + cases := []struct { + a *ComparableResources + b *ComparableResources + dimension string + }{ + { + a: base, + b: &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 1000, ReservedCores: []uint16{0}}, + }, + }, + }, + { + a: base, + b: &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 4000, ReservedCores: []uint16{0, 1, 2, 3}}, + }, + }, + }, + { + a: base, + b: &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 5000}, + }, + }, + dimension: "cpu", + }, + { + a: base, + b: &ComparableResources{ + Flattened: AllocatedTaskResources{ + Cpu: AllocatedCpuResources{CpuShares: 1000, ReservedCores: []uint16{3, 4}}, + }, + }, + dimension: "cores", + }, + } + + for _, c := range cases { + fit, dim := c.a.Superset(c.b) + if c.dimension == "" { + require.True(t, fit) + } else { + require.False(t, fit) + require.Equal(t, c.dimension, dim) + } + } +} + func requireErrors(t *testing.T, err error, expected ...string) { t.Helper() require.Error(t, err) diff --git a/scheduler/rank.go b/scheduler/rank.go index efe586efa..6ac93bfda 100644 --- a/scheduler/rank.go +++ b/scheduler/rank.go @@ -4,6 +4,8 @@ import ( "fmt" "math" + "github.com/hashicorp/nomad/lib/cpuset" + "github.com/hashicorp/nomad/nomad/structs" ) @@ -403,6 +405,38 @@ OUTER: } } + // Check if we need to allocate any reserved cores + if task.Resources.Cores > 0 { + // set of reservable CPUs for the node + nodeCPUSet := cpuset.New(option.Node.NodeResources.Cpu.ReservableCpuCores...) + // set of all reserved CPUs on the node + allocatedCPUSet := cpuset.New() + for _, alloc := range proposed { + allocatedCPUSet = allocatedCPUSet.Union(cpuset.New(alloc.ComparableResources().Flattened.Cpu.ReservedCores...)) + } + + // add any cores that were reserved for other tasks + for _, tr := range total.Tasks { + allocatedCPUSet = allocatedCPUSet.Union(cpuset.New(tr.Cpu.ReservedCores...)) + } + + // set of CPUs not yet reserved on the node + availableCPUSet := nodeCPUSet.Difference(allocatedCPUSet) + + // If not enough cores are available mark the node as exhausted + if availableCPUSet.Size() < task.Resources.Cores { + // TODO preemption + iter.ctx.Metrics().ExhaustedNode(option.Node, "cores") + continue OUTER + } + + // Set the task's reserved cores + taskResources.Cpu.ReservedCores = availableCPUSet.ToSlice()[0:task.Resources.Cores] + // Total CPU usage on the node is still tracked by CPUShares. Even though the task will have the entire + // core reserved, we still track overall usage by cpu shares. + taskResources.Cpu.CpuShares = option.Node.NodeResources.Cpu.SharesPerCore() * int64(task.Resources.Cores) + } + // Store the task resource option.SetTaskResources(task, taskResources) diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 2cdad5485..5d91dee0f 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -589,6 +589,122 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) { } } +func TestBinPackIterator_ReservedCores(t *testing.T) { + state, ctx := testContext(t) + nodes := []*RankedNode{ + { + Node: &structs.Node{ + // Perfect fit + ID: uuid.Generate(), + NodeResources: &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: 2048, + TotalCpuCores: 2, + ReservableCpuCores: []uint16{0, 1}, + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: 2048, + }, + }, + }, + }, + { + Node: &structs.Node{ + // Perfect fit + ID: uuid.Generate(), + NodeResources: &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: 2048, + TotalCpuCores: 2, + ReservableCpuCores: []uint16{0, 1}, + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: 2048, + }, + }, + }, + }, + } + static := NewStaticRankIterator(ctx, nodes) + + // Add existing allocations + j1, j2 := mock.Job(), mock.Job() + alloc1 := &structs.Allocation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[0].Node.ID, + JobID: j1.ID, + Job: j1, + AllocatedResources: &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 2048, + ReservedCores: []uint16{0, 1}, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 2048, + }, + }, + }, + }, + DesiredStatus: structs.AllocDesiredStatusRun, + ClientStatus: structs.AllocClientStatusPending, + TaskGroup: "web", + } + alloc2 := &structs.Allocation{ + Namespace: structs.DefaultNamespace, + ID: uuid.Generate(), + EvalID: uuid.Generate(), + NodeID: nodes[1].Node.ID, + JobID: j2.ID, + Job: j2, + AllocatedResources: &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 1024, + ReservedCores: []uint16{0}, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 1024, + }, + }, + }, + }, + DesiredStatus: structs.AllocDesiredStatusRun, + ClientStatus: structs.AllocClientStatusPending, + TaskGroup: "web", + } + require.NoError(t, state.UpsertJobSummary(998, mock.JobSummary(alloc1.JobID))) + require.NoError(t, state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID))) + require.NoError(t, state.UpsertAllocs(structs.MsgTypeTestSetup, 1000, []*structs.Allocation{alloc1, alloc2})) + + taskGroup := &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + Cores: 1, + MemoryMB: 1024, + }, + }, + }, + } + binp := NewBinPackIterator(ctx, static, false, 0, structs.SchedulerAlgorithmBinpack) + binp.SetTaskGroup(taskGroup) + + scoreNorm := NewScoreNormalizationIterator(ctx, binp) + + out := collectRanked(scoreNorm) + require := require.New(t) + require.Len(out, 1) + require.Equal(nodes[1].Node.ID, out[0].Node.ID) + require.Equal([]uint16{1}, out[0].TaskResources["web"].Cpu.ReservedCores) +} + func TestBinPackIterator_ExistingAlloc(t *testing.T) { state, ctx := testContext(t) nodes := []*RankedNode{