diff --git a/helper/iterator/iterator.go b/helper/iterator/iterator.go
new file mode 100644
index 000000000..8f87c4b7d
--- /dev/null
+++ b/helper/iterator/iterator.go
@@ -0,0 +1,23 @@
+// Copyright (c) HashiCorp, Inc.
+// SPDX-License-Identifier: MPL-2.0
+
+package iterator
+
+// Iterator represents an object that can iterate over a set of values one at a
+// time.
+type Iterator interface {
+ // Next returns the next element or nil if there are none left.
+ Next() any
+}
+
+// Len consumes the iterator and returns the number of elements found.
+//
+// IMPORTANT: this method consumes the iterator, so it should not be used after
+// Len() returns.
+func Len(iter Iterator) int {
+ count := 0
+ for raw := iter.Next(); raw != nil; raw = iter.Next() {
+ count++
+ }
+ return count
+}
diff --git a/nomad/job_endpoint_hooks.go b/nomad/job_endpoint_hooks.go
index 631d280cd..ee28e19db 100644
--- a/nomad/job_endpoint_hooks.go
+++ b/nomad/job_endpoint_hooks.go
@@ -322,7 +322,12 @@ func (v *memoryOversubscriptionValidate) Validate(job *structs.Job) (warnings []
return nil, err
}
- if c != nil && c.MemoryOversubscriptionEnabled {
+ pool, err := v.srv.State().NodePoolByName(nil, job.NodePool)
+ if err != nil {
+ return nil, err
+ }
+
+ if pool.MemoryOversubscriptionEnabled(c) {
return nil, nil
}
diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go
index e16b5f5e1..cab6dec9e 100644
--- a/nomad/job_endpoint_test.go
+++ b/nomad/job_endpoint_test.go
@@ -2164,6 +2164,113 @@ func TestJobEndpoint_Register_ValidateMemoryMax(t *testing.T) {
require.Empty(t, resp.Warnings)
}
+func TestJobEndpoint_Register_ValidateMemoryMax_NodePool(t *testing.T) {
+ ci.Parallel(t)
+
+ s, cleanupS := TestServer(t, func(c *Config) {
+ c.NumSchedulers = 0 // Prevent automatic dequeue
+ })
+ defer cleanupS()
+ codec := rpcClient(t, s)
+ testutil.WaitForLeader(t, s.RPC)
+
+ // Store default scheduler configuration to reset between test cases.
+ _, defaultSchedConfig, err := s.State().SchedulerConfig()
+ must.NoError(t, err)
+
+ // Create test node pools.
+ noSchedConfig := mock.NodePool()
+ noSchedConfig.SchedulerConfiguration = nil
+
+ withMemOversub := mock.NodePool()
+ withMemOversub.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
+ MemoryOversubscriptionEnabled: pointer.Of(true),
+ }
+
+ noMemOversub := mock.NodePool()
+ noMemOversub.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
+ MemoryOversubscriptionEnabled: pointer.Of(false),
+ }
+
+ s.State().UpsertNodePools(structs.MsgTypeTestSetup, 100, []*structs.NodePool{
+ noSchedConfig,
+ withMemOversub,
+ noMemOversub,
+ })
+
+ testCases := []struct {
+ name string
+ pool string
+ globalConfig *structs.SchedulerConfiguration
+ expectedWarning string
+ }{
+ {
+ name: "no scheduler config uses global config",
+ pool: noSchedConfig.Name,
+ globalConfig: &structs.SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: true,
+ },
+ expectedWarning: "",
+ },
+ {
+ name: "enabled via node pool",
+ pool: withMemOversub.Name,
+ globalConfig: &structs.SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: false,
+ },
+ expectedWarning: "",
+ },
+ {
+ name: "disabled via node pool",
+ pool: noMemOversub.Name,
+ globalConfig: &structs.SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: true,
+ },
+ expectedWarning: "Memory oversubscription is not enabled",
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ // Set global scheduler config if provided.
+ if tc.globalConfig != nil {
+ idx, err := s.State().LatestIndex()
+ must.NoError(t, err)
+
+ err = s.State().SchedulerSetConfig(idx, tc.globalConfig)
+ must.NoError(t, err)
+ }
+
+ // Create job with node_pool and memory_max.
+ job := mock.Job()
+ job.TaskGroups[0].Tasks[0].Resources.MemoryMaxMB = 2000
+ job.NodePool = tc.pool
+
+ req := &structs.JobRegisterRequest{
+ Job: job,
+ WriteRequest: structs.WriteRequest{
+ Region: "global",
+ Namespace: job.Namespace,
+ },
+ }
+ var resp structs.JobRegisterResponse
+ err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)
+
+ // Validate respose.
+ must.NoError(t, err)
+ if tc.expectedWarning != "" {
+ must.StrContains(t, resp.Warnings, tc.expectedWarning)
+ } else {
+ must.Eq(t, "", resp.Warnings)
+ }
+
+ // Reset to default global scheduler config.
+ err = s.State().SchedulerSetConfig(resp.Index+1, defaultSchedConfig)
+ must.NoError(t, err)
+ })
+ }
+}
+
// evalUpdateFromRaft searches the raft logs for the eval update pertaining to the eval
func evalUpdateFromRaft(t *testing.T, s *Server, evalID string) *structs.Evaluation {
var store raft.LogStore = s.raftInmem
diff --git a/nomad/structs/node_pool.go b/nomad/structs/node_pool.go
index e706b4ff2..babc3e1ed 100644
--- a/nomad/structs/node_pool.go
+++ b/nomad/structs/node_pool.go
@@ -117,6 +117,24 @@ func (n *NodePool) IsBuiltIn() bool {
}
}
+// MemoryOversubscriptionEnabled returns true if memory oversubscription is
+// enabled in the node pool or in the global cluster configuration.
+func (n *NodePool) MemoryOversubscriptionEnabled(global *SchedulerConfiguration) bool {
+
+ // Default to the global scheduler config.
+ memOversubEnabled := global != nil && global.MemoryOversubscriptionEnabled
+
+ // But overwrite it if the node pool also has it configured.
+ poolHasMemOversub := n != nil &&
+ n.SchedulerConfiguration != nil &&
+ n.SchedulerConfiguration.MemoryOversubscriptionEnabled != nil
+ if poolHasMemOversub {
+ memOversubEnabled = *n.SchedulerConfiguration.MemoryOversubscriptionEnabled
+ }
+
+ return memOversubEnabled
+}
+
// SetHash is used to compute and set the hash of node pool
func (n *NodePool) SetHash() []byte {
// Initialize a 256bit Blake2 hash (32 bytes)
@@ -163,6 +181,9 @@ func (n *NodePool) SetHash() []byte {
// NodePoolSchedulerConfiguration is the scheduler confinguration applied to a
// node pool.
+//
+// When adding new values that should override global scheduler configuration,
+// verify the scheduler handles the node pool configuration as well.
type NodePoolSchedulerConfiguration struct {
// SchedulerAlgorithm is the scheduling algorithm to use for the pool.
diff --git a/nomad/structs/node_pool_test.go b/nomad/structs/node_pool_test.go
index f261c49c6..dd04df74c 100644
--- a/nomad/structs/node_pool_test.go
+++ b/nomad/structs/node_pool_test.go
@@ -127,3 +127,70 @@ func TestNodePool_IsBuiltIn(t *testing.T) {
})
}
}
+
+func TestNodePool_MemoryOversubscriptionEnabled(t *testing.T) {
+ ci.Parallel(t)
+
+ testCases := []struct {
+ name string
+ pool *NodePool
+ global *SchedulerConfiguration
+ expected bool
+ }{
+ {
+ name: "global used if pool is nil",
+ pool: nil,
+ global: &SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: true,
+ },
+ expected: true,
+ },
+ {
+ name: "global used if pool doesn't have scheduler config",
+ pool: &NodePool{},
+ global: &SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: true,
+ },
+ expected: true,
+ },
+ {
+ name: "global used if pool doesn't specify memory oversub",
+ pool: &NodePool{
+ SchedulerConfiguration: &NodePoolSchedulerConfiguration{},
+ },
+ global: &SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: true,
+ },
+ expected: true,
+ },
+ {
+ name: "pool overrides global if it defines memory oversub",
+ pool: &NodePool{
+ SchedulerConfiguration: &NodePoolSchedulerConfiguration{
+ MemoryOversubscriptionEnabled: pointer.Of(false),
+ },
+ },
+ global: &SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: true,
+ },
+ expected: false,
+ },
+ {
+ name: "pool used if global is nil",
+ pool: &NodePool{
+ SchedulerConfiguration: &NodePoolSchedulerConfiguration{
+ MemoryOversubscriptionEnabled: pointer.Of(true),
+ },
+ },
+ global: nil,
+ expected: true,
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ got := tc.pool.MemoryOversubscriptionEnabled(tc.global)
+ must.Eq(t, got, tc.expected)
+ })
+ }
+}
diff --git a/nomad/structs/operator.go b/nomad/structs/operator.go
index a14e4ccaf..91184024e 100644
--- a/nomad/structs/operator.go
+++ b/nomad/structs/operator.go
@@ -196,6 +196,26 @@ func (s *SchedulerConfiguration) EffectiveSchedulerAlgorithm() SchedulerAlgorith
return s.SchedulerAlgorithm
}
+// WithNodePool returns a new SchedulerConfiguration with the node pool
+// scheduler configuration applied.
+func (s *SchedulerConfiguration) WithNodePool(pool *NodePool) *SchedulerConfiguration {
+ schedConfig := s.Copy()
+
+ if pool == nil || pool.SchedulerConfiguration == nil {
+ return schedConfig
+ }
+
+ poolConfig := pool.SchedulerConfiguration
+ if poolConfig.SchedulerAlgorithm != "" {
+ schedConfig.SchedulerAlgorithm = poolConfig.SchedulerAlgorithm
+ }
+ if poolConfig.MemoryOversubscriptionEnabled != nil {
+ schedConfig.MemoryOversubscriptionEnabled = *poolConfig.MemoryOversubscriptionEnabled
+ }
+
+ return schedConfig
+}
+
func (s *SchedulerConfiguration) Canonicalize() {
if s != nil && s.SchedulerAlgorithm == "" {
s.SchedulerAlgorithm = SchedulerAlgorithmBinpack
diff --git a/nomad/structs/operator_test.go b/nomad/structs/operator_test.go
new file mode 100644
index 000000000..229fdb050
--- /dev/null
+++ b/nomad/structs/operator_test.go
@@ -0,0 +1,108 @@
+// Copyright (c) HashiCorp, Inc.
+// SPDX-License-Identifier: MPL-2.0
+
+package structs
+
+import (
+ "testing"
+
+ "github.com/hashicorp/nomad/ci"
+ "github.com/hashicorp/nomad/helper/pointer"
+ "github.com/shoenig/test/must"
+)
+
+func TestSchedulerConfiguration_WithNodePool(t *testing.T) {
+ ci.Parallel(t)
+
+ testCases := []struct {
+ name string
+ schedConfig *SchedulerConfiguration
+ pool *NodePool
+ expected *SchedulerConfiguration
+ }{
+ {
+ name: "nil pool returns same config",
+ schedConfig: &SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: false,
+ SchedulerAlgorithm: SchedulerAlgorithmSpread,
+ },
+ pool: nil,
+ expected: &SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: false,
+ SchedulerAlgorithm: SchedulerAlgorithmSpread,
+ },
+ },
+ {
+ name: "nil pool scheduler config returns same config",
+ schedConfig: &SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: false,
+ SchedulerAlgorithm: SchedulerAlgorithmSpread,
+ },
+ pool: &NodePool{},
+ expected: &SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: false,
+ SchedulerAlgorithm: SchedulerAlgorithmSpread,
+ },
+ },
+ {
+ name: "pool with memory oversubscription overwrites config",
+ schedConfig: &SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: false,
+ },
+ pool: &NodePool{
+ SchedulerConfiguration: &NodePoolSchedulerConfiguration{
+ MemoryOversubscriptionEnabled: pointer.Of(true),
+ },
+ },
+ expected: &SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: true,
+ },
+ },
+ {
+ name: "pool with scheduler algorithm overwrites config",
+ schedConfig: &SchedulerConfiguration{
+ SchedulerAlgorithm: SchedulerAlgorithmBinpack,
+ },
+ pool: &NodePool{
+ SchedulerConfiguration: &NodePoolSchedulerConfiguration{
+ SchedulerAlgorithm: SchedulerAlgorithmSpread,
+ },
+ },
+ expected: &SchedulerConfiguration{
+ SchedulerAlgorithm: SchedulerAlgorithmSpread,
+ },
+ },
+ {
+ name: "pool without memory oversubscription does not modify config",
+ schedConfig: &SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: false,
+ },
+ pool: &NodePool{
+ SchedulerConfiguration: &NodePoolSchedulerConfiguration{},
+ },
+ expected: &SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: false,
+ },
+ },
+ {
+ name: "pool without scheduler algorithm does not modify config",
+ schedConfig: &SchedulerConfiguration{
+ SchedulerAlgorithm: SchedulerAlgorithmSpread,
+ },
+ pool: &NodePool{
+ SchedulerConfiguration: &NodePoolSchedulerConfiguration{},
+ },
+ expected: &SchedulerConfiguration{
+ SchedulerAlgorithm: SchedulerAlgorithmSpread,
+ },
+ },
+ }
+
+ for _, tc := range testCases {
+ t.Run(tc.name, func(t *testing.T) {
+ got := tc.schedConfig.WithNodePool(tc.pool)
+ must.Eq(t, tc.expected, got)
+ must.NotEqOp(t, tc.schedConfig, got)
+ })
+ }
+}
diff --git a/scheduler/generic_sched.go b/scheduler/generic_sched.go
index 3d04d9c5b..e5af34b1d 100644
--- a/scheduler/generic_sched.go
+++ b/scheduler/generic_sched.go
@@ -13,6 +13,7 @@ import (
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/go-version"
+ "github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -281,7 +282,7 @@ func (s *GenericScheduler) process() (bool, error) {
// Construct the placement stack
s.stack = NewGenericStack(s.batch, s.ctx)
if !s.job.Stopped() {
- s.stack.SetJob(s.job)
+ s.setJob(s.job)
}
// Compute the target job allocations
@@ -509,7 +510,7 @@ func (s *GenericScheduler) downgradedJobForPlacement(p placementResult) (string,
// destructive updates to place and the set of new placements to place.
func (s *GenericScheduler) computePlacements(destructive, place []placementResult) error {
// Get the base nodes
- nodes, _, byDC, err := readyNodesInDCsAndPool(s.state, s.job.Datacenters, s.job.NodePool)
+ nodes, byDC, err := s.setNodes(s.job)
if err != nil {
return err
}
@@ -562,10 +563,17 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
continue
}
- // Use downgraded job in scheduling stack to honor
- // old job resources and constraints
+ // Use downgraded job in scheduling stack to honor old job
+ // resources, constraints, and node pool scheduler configuration.
if downgradedJob != nil {
- s.stack.SetJob(downgradedJob)
+ s.setJob(downgradedJob)
+
+ if needsToSetNodes(downgradedJob, s.job) {
+ nodes, byDC, err = s.setNodes(downgradedJob)
+ if err != nil {
+ return err
+ }
+ }
}
// Find the preferred node
@@ -596,9 +604,17 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
// Compute top K scoring node metadata
s.ctx.Metrics().PopulateScoreMetaData()
- // Restore stack job now that placement is done, to use plan job version
+ // Restore stack job and nodes now that placement is done, to use
+ // plan job version
if downgradedJob != nil {
- s.stack.SetJob(s.job)
+ s.setJob(s.job)
+
+ if needsToSetNodes(downgradedJob, s.job) {
+ nodes, byDC, err = s.setNodes(s.job)
+ if err != nil {
+ return err
+ }
+ }
}
// Set fields based on if we found an allocation option
@@ -690,6 +706,45 @@ func (s *GenericScheduler) computePlacements(destructive, place []placementResul
return nil
}
+// setJob updates the stack with the given job and job's node pool scheduler
+// configuration.
+func (s *GenericScheduler) setJob(job *structs.Job) error {
+ // Fetch node pool and global scheduler configuration to determine how to
+ // configure the scheduler.
+ pool, err := s.state.NodePoolByName(nil, job.NodePool)
+ if err != nil {
+ return fmt.Errorf("failed to get job node pool %q: %v", job.NodePool, err)
+ }
+
+ _, schedConfig, err := s.state.SchedulerConfig()
+ if err != nil {
+ return fmt.Errorf("failed to get scheduler configuration: %v", err)
+ }
+
+ s.stack.SetJob(job)
+ s.stack.SetSchedulerConfiguration(schedConfig.WithNodePool(pool))
+ return nil
+}
+
+// setnodes updates the stack with the nodes that are ready for placement for
+// the given job.
+func (s *GenericScheduler) setNodes(job *structs.Job) ([]*structs.Node, map[string]int, error) {
+ nodes, _, byDC, err := readyNodesInDCsAndPool(s.state, job.Datacenters, job.NodePool)
+ if err != nil {
+ return nil, nil, err
+ }
+
+ s.stack.SetNodes(nodes)
+ return nodes, byDC, nil
+}
+
+// needsToSetNodes returns true if jobs a and b changed in a way that requires
+// the nodes to be reset.
+func needsToSetNodes(a, b *structs.Job) bool {
+ return !helper.SliceSetEq(a.Datacenters, b.Datacenters) ||
+ a.NodePool != b.NodePool
+}
+
// propagateTaskState copies task handles from previous allocations to
// replacement allocations when the previous allocation is being drained or was
// lost. Remote task drivers rely on this to reconnect to remote tasks when the
@@ -818,6 +873,11 @@ func (s *GenericScheduler) selectNextOption(tg *structs.TaskGroup, selectOptions
_, schedConfig, _ := s.ctx.State().SchedulerConfig()
// Check if preemption is enabled, defaults to true
+ //
+ // The scheduler configuration is read directly from state but only
+ // values that can't be specified per node pool should be used. Other
+ // values must be merged by calling schedConfig.WithNodePool() and set in
+ // the stack by calling SetSchedulerConfiguration().
enablePreemption := true
if schedConfig != nil {
if s.job.Type == structs.JobTypeBatch {
diff --git a/scheduler/generic_sched_test.go b/scheduler/generic_sched_test.go
index 20bbbb462..3c9a833c1 100644
--- a/scheduler/generic_sched_test.go
+++ b/scheduler/generic_sched_test.go
@@ -124,119 +124,6 @@ func TestServiceSched_JobRegister(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
-func TestServiceSched_JobRegister_MemoryMaxHonored(t *testing.T) {
- ci.Parallel(t)
-
- cases := []struct {
- name string
- cpu int
- memory int
- memoryMax int
- memoryOversubscriptionEnabled bool
-
- expectedTaskMemoryMax int
- // expectedTotalMemoryMax should be SUM(MAX(memory, memoryMax)) for all tasks
- expectedTotalMemoryMax int
- }{
- {
- name: "plain no max",
- cpu: 100,
- memory: 200,
- memoryMax: 0,
- memoryOversubscriptionEnabled: true,
-
- expectedTaskMemoryMax: 0,
- expectedTotalMemoryMax: 200,
- },
- {
- name: "with max",
- cpu: 100,
- memory: 200,
- memoryMax: 300,
- memoryOversubscriptionEnabled: true,
-
- expectedTaskMemoryMax: 300,
- expectedTotalMemoryMax: 300,
- },
- {
- name: "with max but disabled",
- cpu: 100,
- memory: 200,
- memoryMax: 300,
-
- memoryOversubscriptionEnabled: false,
- expectedTaskMemoryMax: 0,
- expectedTotalMemoryMax: 200, // same as no max
- },
- }
-
- for _, c := range cases {
- t.Run(c.name, func(t *testing.T) {
- job := mock.Job()
- job.TaskGroups[0].Count = 1
-
- task := job.TaskGroups[0].Tasks[0].Name
- res := job.TaskGroups[0].Tasks[0].Resources
- res.CPU = c.cpu
- res.MemoryMB = c.memory
- res.MemoryMaxMB = c.memoryMax
-
- h := NewHarness(t)
- h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{
- MemoryOversubscriptionEnabled: c.memoryOversubscriptionEnabled,
- })
-
- // Create some nodes
- for i := 0; i < 10; i++ {
- node := mock.Node()
- require.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
- }
- require.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
-
- // Create a mock evaluation to register the job
- eval := &structs.Evaluation{
- Namespace: structs.DefaultNamespace,
- ID: uuid.Generate(),
- Priority: job.Priority,
- TriggeredBy: structs.EvalTriggerJobRegister,
- JobID: job.ID,
- Status: structs.EvalStatusPending,
- }
-
- require.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
-
- // Process the evaluation
- err := h.Process(NewServiceScheduler, eval)
- require.NoError(t, err)
-
- require.Len(t, h.Plans, 1)
-
- out, err := h.State.AllocsByJob(nil, job.Namespace, job.ID, false)
- require.NoError(t, err)
-
- // Ensure all allocations placed
- require.Len(t, out, 1)
- alloc := out[0]
-
- // checking new resources field deprecated Resources fields
- require.Equal(t, int64(c.cpu), alloc.AllocatedResources.Tasks[task].Cpu.CpuShares)
- require.Equal(t, int64(c.memory), alloc.AllocatedResources.Tasks[task].Memory.MemoryMB)
- require.Equal(t, int64(c.expectedTaskMemoryMax), alloc.AllocatedResources.Tasks[task].Memory.MemoryMaxMB)
-
- // checking old deprecated Resources fields
- require.Equal(t, c.cpu, alloc.TaskResources[task].CPU)
- require.Equal(t, c.memory, alloc.TaskResources[task].MemoryMB)
- require.Equal(t, c.expectedTaskMemoryMax, alloc.TaskResources[task].MemoryMaxMB)
-
- // check total resource fields - alloc.Resources deprecated field, no modern equivalent
- require.Equal(t, c.cpu, alloc.Resources.CPU)
- require.Equal(t, c.memory, alloc.Resources.MemoryMB)
- require.Equal(t, c.expectedTotalMemoryMax, alloc.Resources.MemoryMaxMB)
-
- })
- }
-}
-
func TestServiceSched_JobRegister_StickyAllocs(t *testing.T) {
ci.Parallel(t)
@@ -854,6 +741,140 @@ func TestServiceSched_Spread(t *testing.T) {
}
}
+// TestServiceSched_JobRegister_NodePool_Downgrade tests the case where an
+// allocation fails during a deployment with canaries, where the job changes
+// node pool. The failed alloc should be placed in the node pool of the
+// original job.
+func TestServiceSched_JobRegister_NodePool_Downgrade(t *testing.T) {
+ ci.Parallel(t)
+
+ h := NewHarness(t)
+
+ // Set global scheduler configuration.
+ h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{
+ SchedulerAlgorithm: structs.SchedulerAlgorithmBinpack,
+ })
+
+ // Create test node pools with different scheduler algorithms.
+ poolBinpack := mock.NodePool()
+ poolBinpack.Name = "pool-binpack"
+ poolBinpack.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
+ SchedulerAlgorithm: structs.SchedulerAlgorithmBinpack,
+ }
+
+ poolSpread := mock.NodePool()
+ poolSpread.Name = "pool-spread"
+ poolSpread.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
+ SchedulerAlgorithm: structs.SchedulerAlgorithmSpread,
+ }
+
+ nodePools := []*structs.NodePool{
+ poolBinpack,
+ poolSpread,
+ }
+ h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools)
+
+ // Create 5 nodes in each node pool.
+ // Use two loops so nodes are separated by node pool.
+ nodes := []*structs.Node{}
+ for i := 0; i < 5; i++ {
+ node := mock.Node()
+ node.Name = fmt.Sprintf("node-binpack-%d", i)
+ node.NodePool = poolBinpack.Name
+ nodes = append(nodes, node)
+ must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
+ }
+ for i := 0; i < 5; i++ {
+ node := mock.Node()
+ node.Name = fmt.Sprintf("node-spread-%d", i)
+ node.NodePool = poolSpread.Name
+ nodes = append(nodes, node)
+ must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
+ }
+
+ // Create first version of the test job running in the binpack node pool.
+ job1 := mock.Job()
+ job1.Version = 1
+ job1.NodePool = poolBinpack.Name
+ job1.Status = structs.JobStatusRunning
+ job1.TaskGroups[0].Count = 3
+ job1.TaskGroups[0].Update = &structs.UpdateStrategy{
+ Stagger: time.Duration(30 * time.Second),
+ MaxParallel: 1,
+ HealthCheck: "checks",
+ MinHealthyTime: time.Duration(30 * time.Second),
+ HealthyDeadline: time.Duration(9 * time.Minute),
+ ProgressDeadline: time.Duration(10 * time.Minute),
+ AutoRevert: true,
+ Canary: 1,
+ }
+ must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job1))
+
+ // Create allocs for this job version with one being a canary and another
+ // marked as failed.
+ allocs := []*structs.Allocation{}
+ for i := 0; i < 3; i++ {
+ alloc := mock.Alloc()
+ alloc.Job = job1
+ alloc.JobID = job1.ID
+ alloc.NodeID = nodes[i].ID
+ alloc.DeploymentStatus = &structs.AllocDeploymentStatus{
+ Healthy: pointer.Of(true),
+ Timestamp: time.Now(),
+ Canary: false,
+ ModifyIndex: h.NextIndex(),
+ }
+ if i == 0 {
+ alloc.DeploymentStatus.Canary = true
+ }
+ if i == 1 {
+ alloc.ClientStatus = structs.AllocClientStatusFailed
+ }
+ allocs = append(allocs, alloc)
+ }
+ must.NoError(t, h.State.UpsertAllocs(structs.MsgTypeTestSetup, h.NextIndex(), allocs))
+
+ // Update job to place it in the spread node pool.
+ job2 := job1.Copy()
+ job2.Version = 2
+ job2.NodePool = poolSpread.Name
+ must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job2))
+
+ eval := &structs.Evaluation{
+ Namespace: job2.Namespace,
+ ID: uuid.Generate(),
+ Priority: job2.Priority,
+ TriggeredBy: structs.EvalTriggerJobRegister,
+ JobID: job2.ID,
+ Status: structs.EvalStatusPending,
+ }
+ must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
+
+ processErr := h.Process(NewServiceScheduler, eval)
+ require.NoError(t, processErr, "failed to process eval")
+ require.Len(t, h.Plans, 1)
+
+ // Verify the plan places the new allocation in the spread node pool and
+ // the replacement failure from the previous version in the binpack pool.
+ for nodeID, allocs := range h.Plans[0].NodeAllocation {
+ var node *structs.Node
+ for _, n := range nodes {
+ if n.ID == nodeID {
+ node = n
+ break
+ }
+ }
+
+ must.Len(t, 1, allocs)
+ alloc := allocs[0]
+ must.Eq(t, alloc.Job.NodePool, node.NodePool, must.Sprintf(
+ "alloc for job in node pool %q placed in node in node pool %q",
+ alloc.Job.NodePool,
+ node.NodePool,
+ ))
+ }
+}
+
// Test job registration with even spread across dc
func TestServiceSched_EvenSpread(t *testing.T) {
ci.Parallel(t)
@@ -1336,6 +1357,168 @@ func TestServiceSched_JobRegister_FeasibleAndInfeasibleTG(t *testing.T) {
h.AssertEvalStatus(t, structs.EvalStatusComplete)
}
+func TestServiceSched_JobRegister_SchedulerAlgorithm(t *testing.T) {
+ ci.Parallel(t)
+
+ // Test node pools.
+ poolNoSchedConfig := mock.NodePool()
+ poolNoSchedConfig.SchedulerConfiguration = nil
+
+ poolBinpack := mock.NodePool()
+ poolBinpack.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
+ SchedulerAlgorithm: structs.SchedulerAlgorithmBinpack,
+ }
+
+ poolSpread := mock.NodePool()
+ poolSpread.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
+ SchedulerAlgorithm: structs.SchedulerAlgorithmSpread,
+ }
+
+ testCases := []struct {
+ name string
+ nodePool string
+ schedulerAlgorithm structs.SchedulerAlgorithm
+ expectedAlgorithm structs.SchedulerAlgorithm
+ }{
+ {
+ name: "global binpack",
+ nodePool: poolNoSchedConfig.Name,
+ schedulerAlgorithm: structs.SchedulerAlgorithmBinpack,
+ expectedAlgorithm: structs.SchedulerAlgorithmBinpack,
+ },
+ {
+ name: "global spread",
+ nodePool: poolNoSchedConfig.Name,
+ schedulerAlgorithm: structs.SchedulerAlgorithmSpread,
+ expectedAlgorithm: structs.SchedulerAlgorithmSpread,
+ },
+ {
+ name: "node pool binpack overrides global config",
+ nodePool: poolBinpack.Name,
+ schedulerAlgorithm: structs.SchedulerAlgorithmSpread,
+ expectedAlgorithm: structs.SchedulerAlgorithmBinpack,
+ },
+ {
+ name: "node pool spread overrides global config",
+ nodePool: poolSpread.Name,
+ schedulerAlgorithm: structs.SchedulerAlgorithmBinpack,
+ expectedAlgorithm: structs.SchedulerAlgorithmSpread,
+ },
+ }
+
+ jobTypes := []string{
+ "batch",
+ "service",
+ }
+
+ for _, jobType := range jobTypes {
+ for _, tc := range testCases {
+ t.Run(fmt.Sprintf("%s/%s", jobType, tc.name), func(t *testing.T) {
+ h := NewHarness(t)
+
+ // Create node pools.
+ nodePools := []*structs.NodePool{
+ poolNoSchedConfig,
+ poolBinpack,
+ poolSpread,
+ }
+ h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools)
+
+ // Create two test nodes. Use two to prevent flakiness due to
+ // the scheduler shuffling nodes.
+ for i := 0; i < 2; i++ {
+ node := mock.Node()
+ node.NodePool = tc.nodePool
+ must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
+ }
+
+ // Set global scheduler configuration.
+ h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{
+ SchedulerAlgorithm: tc.schedulerAlgorithm,
+ })
+
+ // Create test job.
+ var job *structs.Job
+ switch jobType {
+ case "batch":
+ job = mock.BatchJob()
+ case "service":
+ job = mock.Job()
+ }
+ job.TaskGroups[0].Count = 1
+ job.NodePool = tc.nodePool
+ must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
+
+ // Register an existing job.
+ existingJob := mock.Job()
+ existingJob.TaskGroups[0].Count = 1
+ existingJob.NodePool = tc.nodePool
+ must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, existingJob))
+
+ // Process eval for existing job to place an existing alloc.
+ eval := &structs.Evaluation{
+ Namespace: structs.DefaultNamespace,
+ ID: uuid.Generate(),
+ Priority: existingJob.Priority,
+ TriggeredBy: structs.EvalTriggerJobRegister,
+ JobID: existingJob.ID,
+ Status: structs.EvalStatusPending,
+ }
+ must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
+
+ var scheduler Factory
+ switch jobType {
+ case "batch":
+ scheduler = NewBatchScheduler
+ case "service":
+ scheduler = NewServiceScheduler
+ }
+ err := h.Process(scheduler, eval)
+ must.NoError(t, err)
+
+ must.Len(t, 1, h.Plans)
+ allocs, err := h.State.AllocsByJob(nil, existingJob.Namespace, existingJob.ID, false)
+ must.NoError(t, err)
+ must.Len(t, 1, allocs)
+
+ // Process eval for test job.
+ eval = &structs.Evaluation{
+ Namespace: structs.DefaultNamespace,
+ ID: uuid.Generate(),
+ Priority: job.Priority,
+ TriggeredBy: structs.EvalTriggerJobRegister,
+ JobID: job.ID,
+ Status: structs.EvalStatusPending,
+ }
+ must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
+ err = h.Process(scheduler, eval)
+ must.NoError(t, err)
+
+ must.Len(t, 2, h.Plans)
+ allocs, err = h.State.AllocsByJob(nil, job.Namespace, job.ID, false)
+ must.NoError(t, err)
+ must.Len(t, 1, allocs)
+
+ // Expect new alloc to be either in the empty node or in the
+ // node with the existing alloc depending on the expected
+ // scheduler algorithm.
+ var expectedAllocCount int
+ switch tc.expectedAlgorithm {
+ case structs.SchedulerAlgorithmSpread:
+ expectedAllocCount = 1
+ case structs.SchedulerAlgorithmBinpack:
+ expectedAllocCount = 2
+ }
+
+ alloc := allocs[0]
+ nodeAllocs, err := h.State.AllocsByNode(nil, alloc.NodeID)
+ must.NoError(t, err)
+ must.Len(t, expectedAllocCount, nodeAllocs)
+ })
+ }
+ }
+}
+
// This test just ensures the scheduler handles the eval type to avoid
// regressions.
func TestServiceSched_EvaluateMaxPlanEval(t *testing.T) {
diff --git a/scheduler/preemption_test.go b/scheduler/preemption_test.go
index cba1da068..ee529595f 100644
--- a/scheduler/preemption_test.go
+++ b/scheduler/preemption_test.go
@@ -1356,10 +1356,11 @@ func TestPreemption(t *testing.T) {
ctx.plan.NodePreemptions[node.ID] = tc.currentPreemptions
}
static := NewStaticRankIterator(ctx, nodes)
- binPackIter := NewBinPackIterator(ctx, static, true, tc.jobPriority, testSchedulerConfig)
+ binPackIter := NewBinPackIterator(ctx, static, true, tc.jobPriority)
job := mock.Job()
job.Priority = tc.jobPriority
binPackIter.SetJob(job)
+ binPackIter.SetSchedulerConfiguration(testSchedulerConfig)
taskGroup := &structs.TaskGroup{
EphemeralDisk: &structs.EphemeralDisk{},
diff --git a/scheduler/rank.go b/scheduler/rank.go
index 767e16a32..0b960c39b 100644
--- a/scheduler/rank.go
+++ b/scheduler/rank.go
@@ -164,24 +164,18 @@ type BinPackIterator struct {
// NewBinPackIterator returns a BinPackIterator which tries to fit tasks
// potentially evicting other tasks based on a given priority.
-func NewBinPackIterator(ctx Context, source RankIterator, evict bool, priority int, schedConfig *structs.SchedulerConfiguration) *BinPackIterator {
+func NewBinPackIterator(ctx Context, source RankIterator, evict bool, priority int) *BinPackIterator {
+ return &BinPackIterator{
+ ctx: ctx,
+ source: source,
+ evict: evict,
+ priority: priority,
- algorithm := schedConfig.EffectiveSchedulerAlgorithm()
- scoreFn := structs.ScoreFitBinPack
- if algorithm == structs.SchedulerAlgorithmSpread {
- scoreFn = structs.ScoreFitSpread
+ // These are default values that may be overwritten by
+ // SetSchedulerConfiguration.
+ memoryOversubscription: false,
+ scoreFit: structs.ScoreFitBinPack,
}
-
- iter := &BinPackIterator{
- ctx: ctx,
- source: source,
- evict: evict,
- priority: priority,
- memoryOversubscription: schedConfig != nil && schedConfig.MemoryOversubscriptionEnabled,
- scoreFit: scoreFn,
- }
- iter.ctx.Logger().Named("binpack").Trace("NewBinPackIterator created", "algorithm", algorithm)
- return iter
}
func (iter *BinPackIterator) SetJob(job *structs.Job) {
@@ -193,6 +187,19 @@ func (iter *BinPackIterator) SetTaskGroup(taskGroup *structs.TaskGroup) {
iter.taskGroup = taskGroup
}
+func (iter *BinPackIterator) SetSchedulerConfiguration(schedConfig *structs.SchedulerConfiguration) {
+ // Set scoring function.
+ algorithm := schedConfig.EffectiveSchedulerAlgorithm()
+ scoreFn := structs.ScoreFitBinPack
+ if algorithm == structs.SchedulerAlgorithmSpread {
+ scoreFn = structs.ScoreFitSpread
+ }
+ iter.scoreFit = scoreFn
+
+ // Set memory oversubscription.
+ iter.memoryOversubscription = schedConfig != nil && schedConfig.MemoryOversubscriptionEnabled
+}
+
func (iter *BinPackIterator) Next() *RankedNode {
OUTER:
for {
diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go
index 00005cc75..dce4ced9c 100644
--- a/scheduler/rank_test.go
+++ b/scheduler/rank_test.go
@@ -115,8 +115,9 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) {
},
},
}
- binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
+ binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
+ binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@@ -228,8 +229,9 @@ func TestBinPackIterator_NoExistingAlloc_MixedReserve(t *testing.T) {
},
},
}
- binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
+ binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
+ binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@@ -349,8 +351,9 @@ func TestBinPackIterator_Network_Success(t *testing.T) {
},
},
}
- binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
+ binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
+ binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@@ -481,8 +484,9 @@ func TestBinPackIterator_Network_Failure(t *testing.T) {
},
}
- binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
+ binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
+ binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@@ -575,8 +579,9 @@ func TestBinPackIterator_Network_NoCollision_Node(t *testing.T) {
},
},
}
- binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
+ binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
+ binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
@@ -674,8 +679,9 @@ func TestBinPackIterator_Network_NodeError(t *testing.T) {
},
},
}
- binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
+ binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
+ binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
@@ -801,8 +807,9 @@ func TestBinPackIterator_Network_PortCollision_Alloc(t *testing.T) {
},
},
}
- binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
+ binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
+ binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
out := collectRanked(scoreNorm)
@@ -944,8 +951,9 @@ func TestBinPackIterator_Network_Interpolation_Success(t *testing.T) {
},
},
}
- binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
+ binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
+ binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@@ -1055,8 +1063,9 @@ func TestBinPackIterator_Host_Network_Interpolation_Absent_Value(t *testing.T) {
},
}
- binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
+ binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
+ binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@@ -1156,8 +1165,9 @@ func TestBinPackIterator_Host_Network_Interpolation_Interface_Not_Exists(t *test
},
}
- binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
+ binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
+ binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@@ -1250,8 +1260,9 @@ func TestBinPackIterator_PlannedAlloc(t *testing.T) {
},
}
- binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
+ binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
+ binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@@ -1372,8 +1383,9 @@ func TestBinPackIterator_ReservedCores(t *testing.T) {
},
},
}
- binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
+ binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
+ binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@@ -1482,8 +1494,9 @@ func TestBinPackIterator_ExistingAlloc(t *testing.T) {
},
},
}
- binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
+ binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
+ binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@@ -1602,8 +1615,9 @@ func TestBinPackIterator_ExistingAlloc_PlannedEvict(t *testing.T) {
},
}
- binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
+ binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(taskGroup)
+ binp.SetSchedulerConfiguration(testSchedulerConfig)
scoreNorm := NewScoreNormalizationIterator(ctx, binp)
@@ -1907,8 +1921,9 @@ func TestBinPackIterator_Devices(t *testing.T) {
}
static := NewStaticRankIterator(ctx, []*RankedNode{{Node: c.Node}})
- binp := NewBinPackIterator(ctx, static, false, 0, testSchedulerConfig)
+ binp := NewBinPackIterator(ctx, static, false, 0)
binp.SetTaskGroup(c.TaskGroup)
+ binp.SetSchedulerConfiguration(testSchedulerConfig)
out := binp.Next()
if out == nil && !c.NoPlace {
diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go
index 6a8d2bda9..326bf4780 100644
--- a/scheduler/scheduler.go
+++ b/scheduler/scheduler.go
@@ -78,6 +78,9 @@ type State interface {
// NodesByNodePool returns an iterator over all nodes in the node pool
NodesByNodePool(ws memdb.WatchSet, poolName string) (memdb.ResultIterator, error)
+ // NodePoolByName is used to lookup a node by ID.
+ NodePoolByName(ws memdb.WatchSet, poolName string) (*structs.NodePool, error)
+
// AllocsByJob returns the allocations by JobID
AllocsByJob(ws memdb.WatchSet, namespace, jobID string, all bool) ([]*structs.Allocation, error)
@@ -90,7 +93,7 @@ type State interface {
// AllocsByNodeTerminal returns all the allocations by node filtering by terminal status
AllocsByNodeTerminal(ws memdb.WatchSet, node string, terminal bool) ([]*structs.Allocation, error)
- // GetNodeByID is used to lookup a node by ID
+ // NodeByID is used to lookup a node by ID
NodeByID(ws memdb.WatchSet, nodeID string) (*structs.Node, error)
// GetJobByID is used to lookup a job by ID
diff --git a/scheduler/scheduler_system.go b/scheduler/scheduler_system.go
index 1471701eb..5ff636563 100644
--- a/scheduler/scheduler_system.go
+++ b/scheduler/scheduler_system.go
@@ -156,7 +156,7 @@ func (s *SystemScheduler) process() (bool, error) {
// Construct the placement stack
s.stack = NewSystemStack(s.sysbatch, s.ctx)
if !s.job.Stopped() {
- s.stack.SetJob(s.job)
+ s.setJob(s.job)
}
// Compute the target job allocations
@@ -211,6 +211,26 @@ func (s *SystemScheduler) process() (bool, error) {
return true, nil
}
+// setJob updates the stack with the given job and job's node pool scheduler
+// configuration.
+func (s *SystemScheduler) setJob(job *structs.Job) error {
+ // Fetch node pool and global scheduler configuration to determine how to
+ // configure the scheduler.
+ pool, err := s.state.NodePoolByName(nil, job.NodePool)
+ if err != nil {
+ return fmt.Errorf("failed to get job node pool %q: %v", job.NodePool, err)
+ }
+
+ _, schedConfig, err := s.state.SchedulerConfig()
+ if err != nil {
+ return fmt.Errorf("failed to get scheduler configuration: %v", err)
+ }
+
+ s.stack.SetJob(job)
+ s.stack.SetSchedulerConfiguration(schedConfig.WithNodePool(pool))
+ return nil
+}
+
// computeJobAllocs is used to reconcile differences between the job,
// existing allocations and node status to update the allocations.
func (s *SystemScheduler) computeJobAllocs() error {
diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go
new file mode 100644
index 000000000..003a1571d
--- /dev/null
+++ b/scheduler/scheduler_test.go
@@ -0,0 +1,224 @@
+// Copyright (c) HashiCorp, Inc.
+// SPDX-License-Identifier: MPL-2.0
+
+package scheduler
+
+import (
+ "fmt"
+ "testing"
+
+ "github.com/hashicorp/nomad/ci"
+ "github.com/hashicorp/nomad/helper/iterator"
+ "github.com/hashicorp/nomad/helper/pointer"
+ "github.com/hashicorp/nomad/helper/uuid"
+ "github.com/hashicorp/nomad/nomad/mock"
+ "github.com/hashicorp/nomad/nomad/structs"
+ "github.com/shoenig/test/must"
+)
+
+func TestScheduler_JobRegister_MemoryMaxHonored(t *testing.T) {
+ ci.Parallel(t)
+
+ // Test node pools.
+ poolNoSchedConfig := mock.NodePool()
+ poolNoSchedConfig.SchedulerConfiguration = nil
+
+ poolWithMemOversub := mock.NodePool()
+ poolWithMemOversub.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
+ MemoryOversubscriptionEnabled: pointer.Of(true),
+ }
+
+ poolNoMemOversub := mock.NodePool()
+ poolNoMemOversub.SchedulerConfiguration = &structs.NodePoolSchedulerConfiguration{
+ MemoryOversubscriptionEnabled: pointer.Of(false),
+ }
+
+ cases := []struct {
+ name string
+ nodePool string
+ cpu int
+ memory int
+ memoryMax int
+ memoryOversubscriptionEnabled bool
+
+ expectedTaskMemoryMax int
+ // expectedTotalMemoryMax should be SUM(MAX(memory, memoryMax)) for all
+ // tasks if memory oversubscription is enabled and SUM(memory) if it's
+ // disabled.
+ expectedTotalMemoryMax int
+ }{
+ {
+ name: "plain no max",
+ nodePool: poolNoSchedConfig.Name,
+ cpu: 100,
+ memory: 200,
+ memoryMax: 0,
+ memoryOversubscriptionEnabled: true,
+
+ expectedTaskMemoryMax: 0,
+ expectedTotalMemoryMax: 200,
+ },
+ {
+ name: "with max",
+ nodePool: poolNoSchedConfig.Name,
+ cpu: 100,
+ memory: 200,
+ memoryMax: 300,
+ memoryOversubscriptionEnabled: true,
+
+ expectedTaskMemoryMax: 300,
+ expectedTotalMemoryMax: 300,
+ },
+ {
+ name: "with max but disabled",
+ nodePool: poolNoSchedConfig.Name,
+ cpu: 100,
+ memory: 200,
+ memoryMax: 300,
+
+ memoryOversubscriptionEnabled: false,
+ expectedTaskMemoryMax: 0,
+ expectedTotalMemoryMax: 200, // same as no max
+ },
+ {
+ name: "with max and enabled by node pool",
+ nodePool: poolWithMemOversub.Name,
+ cpu: 100,
+ memory: 200,
+ memoryMax: 300,
+ memoryOversubscriptionEnabled: false,
+
+ expectedTaskMemoryMax: 300,
+ expectedTotalMemoryMax: 300,
+ },
+ {
+ name: "with max but disabled by node pool",
+ nodePool: poolNoMemOversub.Name,
+ cpu: 100,
+ memory: 200,
+ memoryMax: 300,
+ memoryOversubscriptionEnabled: true,
+
+ expectedTaskMemoryMax: 0,
+ expectedTotalMemoryMax: 200, // same as no max
+ },
+ }
+
+ jobTypes := []string{
+ "batch",
+ "service",
+ "sysbatch",
+ "system",
+ }
+
+ for _, jobType := range jobTypes {
+ for _, c := range cases {
+ t.Run(fmt.Sprintf("%s/%s", jobType, c.name), func(t *testing.T) {
+ h := NewHarness(t)
+
+ // Create node pools.
+ nodePools := []*structs.NodePool{
+ poolNoSchedConfig,
+ poolWithMemOversub,
+ poolNoMemOversub,
+ }
+ h.State.UpsertNodePools(structs.MsgTypeTestSetup, h.NextIndex(), nodePools)
+
+ // Create some nodes.
+ for i := 0; i < 3; i++ {
+ node := mock.Node()
+ node.NodePool = c.nodePool
+ must.NoError(t, h.State.UpsertNode(structs.MsgTypeTestSetup, h.NextIndex(), node))
+ }
+
+ // Set global scheduler configuration.
+ h.State.SchedulerSetConfig(h.NextIndex(), &structs.SchedulerConfiguration{
+ MemoryOversubscriptionEnabled: c.memoryOversubscriptionEnabled,
+ })
+
+ // Create test job.
+ var job *structs.Job
+ switch jobType {
+ case "batch":
+ job = mock.BatchJob()
+ case "service":
+ job = mock.Job()
+ case "sysbatch":
+ job = mock.SystemBatchJob()
+ case "system":
+ job = mock.SystemJob()
+ }
+ job.TaskGroups[0].Count = 1
+ job.NodePool = c.nodePool
+
+ task := job.TaskGroups[0].Tasks[0].Name
+ res := job.TaskGroups[0].Tasks[0].Resources
+ res.CPU = c.cpu
+ res.MemoryMB = c.memory
+ res.MemoryMaxMB = c.memoryMax
+
+ must.NoError(t, h.State.UpsertJob(structs.MsgTypeTestSetup, h.NextIndex(), nil, job))
+
+ // Create a mock evaluation to register the job
+ eval := &structs.Evaluation{
+ Namespace: structs.DefaultNamespace,
+ ID: uuid.Generate(),
+ Priority: job.Priority,
+ TriggeredBy: structs.EvalTriggerJobRegister,
+ JobID: job.ID,
+ Status: structs.EvalStatusPending,
+ }
+
+ must.NoError(t, h.State.UpsertEvals(structs.MsgTypeTestSetup, h.NextIndex(), []*structs.Evaluation{eval}))
+
+ // Process the evaluation
+ var scheduler Factory
+ switch jobType {
+ case "batch":
+ scheduler = NewBatchScheduler
+ case "service":
+ scheduler = NewServiceScheduler
+ case "sysbatch":
+ scheduler = NewSysBatchScheduler
+ case "system":
+ scheduler = NewSystemScheduler
+ }
+ err := h.Process(scheduler, eval)
+ must.NoError(t, err)
+
+ must.Len(t, 1, h.Plans)
+
+ allocs, err := h.State.AllocsByJob(nil, job.Namespace, job.ID, false)
+ must.NoError(t, err)
+
+ // Ensure all allocations placed
+ var expectedAllocCount int
+ switch jobType {
+ case "batch", "service":
+ expectedAllocCount = 1
+ case "system", "sysbatch":
+ nodes, err := h.State.NodesByNodePool(nil, job.NodePool)
+ must.NoError(t, err)
+ expectedAllocCount = iterator.Len(nodes)
+ }
+ must.Len(t, expectedAllocCount, allocs)
+ alloc := allocs[0]
+
+ // checking new resources field deprecated Resources fields
+ must.Eq(t, int64(c.cpu), alloc.AllocatedResources.Tasks[task].Cpu.CpuShares)
+ must.Eq(t, int64(c.memory), alloc.AllocatedResources.Tasks[task].Memory.MemoryMB)
+ must.Eq(t, int64(c.expectedTaskMemoryMax), alloc.AllocatedResources.Tasks[task].Memory.MemoryMaxMB)
+
+ // checking old deprecated Resources fields
+ must.Eq(t, c.cpu, alloc.TaskResources[task].CPU)
+ must.Eq(t, c.memory, alloc.TaskResources[task].MemoryMB)
+ must.Eq(t, c.expectedTaskMemoryMax, alloc.TaskResources[task].MemoryMaxMB)
+
+ // check total resource fields - alloc.Resources deprecated field, no modern equivalent
+ must.Eq(t, c.cpu, alloc.Resources.CPU)
+ must.Eq(t, c.memory, alloc.Resources.MemoryMB)
+ must.Eq(t, c.expectedTotalMemoryMax, alloc.Resources.MemoryMaxMB)
+ })
+ }
+ }
+}
diff --git a/scheduler/stack.go b/scheduler/stack.go
index 3691c04dd..dffc9bf56 100644
--- a/scheduler/stack.go
+++ b/scheduler/stack.go
@@ -118,6 +118,13 @@ func (s *GenericStack) SetJob(job *structs.Job) {
}
}
+// SetSchedulerConfiguration applies the given scheduler configuration to
+// process nodes. Scheduler configuration values may change per job depending
+// on the node pool being used.
+func (s *GenericStack) SetSchedulerConfiguration(schedConfig *structs.SchedulerConfiguration) {
+ s.binPack.SetSchedulerConfiguration(schedConfig)
+}
+
func (s *GenericStack) Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode {
// This block handles trying to select from preferred nodes if options specify them
@@ -275,6 +282,11 @@ func NewSystemStack(sysbatch bool, ctx Context) *SystemStack {
// Apply the bin packing, this depends on the resources needed
// by a particular task group. Enable eviction as system jobs are high
// priority.
+ //
+ // The scheduler configuration is read directly from state but only
+ // values that can't be specified per node pool should be used. Other
+ // values must be merged by calling schedConfig.WithNodePool() and set in
+ // the stack by calling SetSchedulerConfiguration().
_, schedConfig, _ := s.ctx.State().SchedulerConfig()
enablePreemption := true
if schedConfig != nil {
@@ -286,7 +298,7 @@ func NewSystemStack(sysbatch bool, ctx Context) *SystemStack {
}
// Create binpack iterator
- s.binPack = NewBinPackIterator(ctx, rankSource, enablePreemption, 0, schedConfig)
+ s.binPack = NewBinPackIterator(ctx, rankSource, enablePreemption, 0)
// Apply score normalization
s.scoreNorm = NewScoreNormalizationIterator(ctx, s.binPack)
@@ -311,6 +323,13 @@ func (s *SystemStack) SetJob(job *structs.Job) {
}
}
+// SetSchedulerConfiguration applies the given scheduler configuration to
+// process nodes. Scheduler configuration values may change per job depending
+// on the node pool being used.
+func (s *SystemStack) SetSchedulerConfiguration(schedConfig *structs.SchedulerConfiguration) {
+ s.binPack.SetSchedulerConfiguration(schedConfig)
+}
+
func (s *SystemStack) Select(tg *structs.TaskGroup, options *SelectOptions) *RankedNode {
// Reset the binpack selector and context
s.scoreNorm.Reset()
@@ -412,8 +431,7 @@ func NewGenericStack(batch bool, ctx Context) *GenericStack {
// Apply the bin packing, this depends on the resources needed
// by a particular task group.
- _, schedConfig, _ := ctx.State().SchedulerConfig()
- s.binPack = NewBinPackIterator(ctx, rankSource, false, 0, schedConfig)
+ s.binPack = NewBinPackIterator(ctx, rankSource, false, 0)
// Apply the job anti-affinity iterator. This is to avoid placing
// multiple allocations on the same node for this job.
diff --git a/website/content/api-docs/operator/scheduler.mdx b/website/content/api-docs/operator/scheduler.mdx
index fdad37cda..be1b1328c 100644
--- a/website/content/api-docs/operator/scheduler.mdx
+++ b/website/content/api-docs/operator/scheduler.mdx
@@ -67,12 +67,16 @@ $ curl \
settings mentioned below.
- `SchedulerAlgorithm` `(string: "binpack")` - Specifies whether scheduler
- binpacks or spreads allocations on available nodes.
+ binpacks or spreads allocations on available nodes. Node pools may set
+ their own [`SchedulerAlgorithm`][np_sched_algo] value that takes precedence
+ over this global value.
- - `MemoryOversubscriptionEnabled` `(bool: false)` 1.1 Beta - When
- `true`, tasks may exceed their reserved memory limit, if the client has excess
- memory capacity. Tasks must specify [`memory_max`](/nomad/docs/job-specification/resources#memory_max)
- to take advantage of memory oversubscription.
+ - `MemoryOversubscriptionEnabled` `(bool: false)` - When `true`, tasks may
+ exceed their reserved memory limit, if the client has excess memory
+ capacity. Tasks must specify [`memory_max`](/nomad/docs/job-specification/resources#memory_max)
+ to take advantage of memory oversubscription. Node pools may set their own
+ [`MemoryOversubscriptionEnabled`][np_mem_oversubs] value that takes
+ precedence over this global value.
- `RejectJobRegistration` `(bool: false)` - When `true`, the server will return
permission denied errors for job registration, job dispatch, and job scale APIs,
@@ -148,12 +152,14 @@ server state is authoritative.
- `SchedulerAlgorithm` `(string: "binpack")` - Specifies whether scheduler
binpacks or spreads allocations on available nodes. Possible values are
- `"binpack"` and `"spread"`
+ `"binpack"` and `"spread"`. This value may also be set per [node
+ pool][np_sched_algo].
-- `MemoryOversubscriptionEnabled` `(bool: false)` 1.1 Beta - When
- `true`, tasks may exceed their reserved memory limit, if the client has excess
- memory capacity. Tasks must specify [`memory_max`](/nomad/docs/job-specification/resources#memory_max)
- to take advantage of memory oversubscription.
+- `MemoryOversubscriptionEnabled` `(bool: false)` - When `true`, tasks may
+ exceed their reserved memory limit, if the client has excess memory capacity.
+ Tasks must specify [`memory_max`](/nomad/docs/job-specification/resources#memory_max)
+ to take advantage of memory oversubscription. This value may also be set per
+ [node pool][np_mem_oversubs].
- `RejectJobRegistration` `(bool: false)` - When `true`, the server will return
permission denied errors for job registration, job dispatch, and job scale APIs,
@@ -200,3 +206,5 @@ server state is authoritative.
- `Index` - Current Raft index when the request was received.
[`default_scheduler_config`]: /nomad/docs/configuration/server#default_scheduler_config
+[np_mem_oversubs]: /nomad/docs/other-specifications/node-pool#memory_oversubscription_enabled
+[np_sched_algo]: /nomad/docs/other-specifications/node-pool#scheduler_algorithm
diff --git a/website/content/docs/job-specification/resources.mdx b/website/content/docs/job-specification/resources.mdx
index 3e2be15ac..c01ed1122 100644
--- a/website/content/docs/job-specification/resources.mdx
+++ b/website/content/docs/job-specification/resources.mdx
@@ -97,7 +97,7 @@ the task may exceed the limit and be interrupted; if the task memory is too
high, the cluster is left underutilized.
To help maximize cluster memory utilization while allowing a safety margin for
-unexpected load spikes, Nomad 1.1. lets job authors set two separate memory
+unexpected load spikes, Nomad allows job authors to set two separate memory
limits:
* `memory`: the reserve limit to represent the task’s typical memory usage —
@@ -112,14 +112,15 @@ may kill oversubscribed tasks and reschedule them to other clients. The exact
mechanism for memory pressure is specific to the task driver, operating system,
and application runtime.
-The new max limit attribute is currently supported by the official `docker`,
-`exec`, and `java` task drivers. Consult the documentation of
+The `memory_max` limit attribute is currently supported by the official
+`docker`, `exec`, and `java` task drivers. Consult the documentation of
community-supported task drivers for their memory oversubscription support.
-Memory oversubscription is opt-in. Nomad operators can enable [Memory Oversubscription in the scheduler
-configuration](/nomad/api-docs/operator/scheduler#update-scheduler-configuration). Enterprise customers can use [Resource
-Quotas](/nomad/tutorials/governance-and-policy/quotas) to limit the memory
-oversubscription.
+Memory oversubscription is opt-in. Nomad operators can enable [Memory
+Oversubscription in the scheduler configuration][api_sched_config]. Enterprise
+customers can use [Resource Quotas][tutorial_quota] to limit the memory
+oversubscription and enable or disable memory oversubscription per [node
+pool][np_sched_config].
To avoid degrading the cluster experience, we recommend examining and monitoring
resource utilization and considering the following suggestions:
@@ -136,6 +137,9 @@ resource utilization and considering the following suggestions:
1GB in aggregate before the memory becomes contended and allocations get
killed.
+[api_sched_config]: /nomad/api-docs/operator/scheduler#update-scheduler-configuration
[device]: /nomad/docs/job-specification/device 'Nomad device Job Specification'
[docker_cpu]: /nomad/docs/drivers/docker#cpu
[exec_cpu]: /nomad/docs/drivers/exec#cpu
+[np_sched_config]: /nomad/docs/other-specifications/node-pool#memory_oversubscription_enabled
+[tutorial_quota]: /nomad/tutorials/governance-and-policy/quotas