diff --git a/.changelog/25441.txt b/.changelog/25441.txt new file mode 100644 index 000000000..0e6c2b6b1 --- /dev/null +++ b/.changelog/25441.txt @@ -0,0 +1,3 @@ +```release-note:bug +server: Validate `num_schedulers` configuration parameter is between 0 and the number of CPUs available on the machine +``` diff --git a/command/agent/agent.go b/command/agent/agent.go index 8d8581b22..c8606f574 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -616,6 +616,15 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { conf.KEKProviderConfigs = agentConfig.KEKProviders + // Ensure the passed number of scheduler is between the bounds of zero and + // the number of CPU cores on the machine. The runtime CPU count object is + // populated at process start time, so there is no overhead in calling the + // function compared to saving the value. + if conf.NumSchedulers < 0 || conf.NumSchedulers > runtime.NumCPU() { + return nil, fmt.Errorf("number of schedulers should be between 0 and %d", + runtime.NumCPU()) + } + return conf, nil } diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index c840648fd..586cec6ce 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -15,6 +15,7 @@ import ( "net/url" "os" "reflect" + "runtime" "strconv" "strings" "sync" @@ -1702,6 +1703,14 @@ type schedulerWorkerConfigTest_testExpect struct { // These test cases are run for both the ACL and Non-ACL enabled servers. When // ACLS are not enabled, the request.aclTokens are ignored. func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequestTest { + + numCPU := runtime.NumCPU() + + halfCPU := numCPU / 2 + if halfCPU == 0 { + halfCPU = 1 + } + forbidden := schedulerWorkerConfigTest_testExpect{ expectedResponseCode: http.StatusForbidden, expectedResponse: structs.ErrPermissionDenied.Error(), @@ -1716,12 +1725,11 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ } success1 := schedulerWorkerConfigTest_testExpect{ expectedResponseCode: http.StatusOK, - expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: 8}, + expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: numCPU}, } - success2 := schedulerWorkerConfigTest_testExpect{ expectedResponseCode: http.StatusOK, - expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: 9}, + expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: halfCPU}, } return []scheduleWorkerConfigTest_workerRequestTest{ @@ -1780,7 +1788,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ request: schedulerWorkerConfigTest_testRequest{ verb: http.MethodPost, aclToken: "", - requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`, + requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, halfCPU), }, whenACLNotEnabled: success2, whenACLEnabled: forbidden, @@ -1790,7 +1798,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ request: schedulerWorkerConfigTest_testRequest{ verb: http.MethodPut, aclToken: "", - requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`, + requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, numCPU), }, whenACLNotEnabled: success1, whenACLEnabled: forbidden, @@ -1800,7 +1808,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ request: schedulerWorkerConfigTest_testRequest{ verb: http.MethodPost, aclToken: "node_write", - requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`, + requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, halfCPU), }, whenACLNotEnabled: success2, whenACLEnabled: forbidden, @@ -1810,7 +1818,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ request: schedulerWorkerConfigTest_testRequest{ verb: http.MethodPut, aclToken: "node_write", - requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`, + requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, numCPU), }, whenACLNotEnabled: success1, whenACLEnabled: forbidden, @@ -1820,7 +1828,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ request: schedulerWorkerConfigTest_testRequest{ verb: http.MethodPost, aclToken: "agent_write", - requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`, + requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, halfCPU), }, whenACLNotEnabled: success2, whenACLEnabled: success2, @@ -1830,7 +1838,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ request: schedulerWorkerConfigTest_testRequest{ verb: http.MethodPut, aclToken: "agent_write", - requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`, + requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, numCPU), }, whenACLNotEnabled: success1, whenACLEnabled: success1, @@ -1895,6 +1903,26 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ whenACLNotEnabled: invalidRequest, whenACLEnabled: invalidRequest, }, + { + name: "post with too many schedulers", + request: schedulerWorkerConfigTest_testRequest{ + verb: http.MethodPost, + aclToken: "agent_write", + requestBody: `{"num_schedulers":9223372036854775807,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: invalidRequest, + whenACLEnabled: invalidRequest, + }, + { + name: "put with too many schedulers", + request: schedulerWorkerConfigTest_testRequest{ + verb: http.MethodPut, + aclToken: "agent_write", + requestBody: `{"num_schedulers":9223372036854775807,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: invalidRequest, + whenACLEnabled: invalidRequest, + }, } } @@ -1902,8 +1930,7 @@ func TestHTTP_AgentSchedulerWorkerConfigRequest_NoACL(t *testing.T) { ci.Parallel(t) configFn := func(c *Config) { - var numSchedulers = 8 - c.Server.NumSchedulers = &numSchedulers + c.Server.NumSchedulers = pointer.Of(runtime.NumCPU()) c.Server.EnabledSchedulers = []string{"_core", "batch"} c.Client.Enabled = false } @@ -1935,8 +1962,7 @@ func TestHTTP_AgentSchedulerWorkerConfigRequest_ACL(t *testing.T) { ci.Parallel(t) configFn := func(c *Config) { - var numSchedulers = 8 - c.Server.NumSchedulers = &numSchedulers + c.Server.NumSchedulers = pointer.Of(runtime.NumCPU()) c.Server.EnabledSchedulers = []string{"_core", "batch"} c.Client.Enabled = false } diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index aac7d6191..89c12cae9 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -8,6 +8,7 @@ import ( "math" "os" "path/filepath" + "runtime" "strings" "testing" "time" @@ -678,6 +679,61 @@ func TestAgent_ServerConfig_RaftProtocol_3(t *testing.T) { } } +func Test_convertServerConfig_errors(t *testing.T) { + ci.Parallel(t) + + // This helper function provides an easy way to modify individual parameters + // within the configuration, without having to populate all objects that + // cause a panic when missing. + overlayDefaultFunc := func(cb func(*Config)) *Config { + defaultConfig := DevConfig(nil) + if cb != nil { + cb(defaultConfig) + } + _ = defaultConfig.normalizeAddrs() + return defaultConfig + } + + testCases := []struct { + name string + inputConfig *Config + expectErr bool + }{ + { + name: "num schedulers too big", + inputConfig: overlayDefaultFunc(func(config *Config) { + config.Server.NumSchedulers = pointer.Of(1<<63 - 1) + }), + expectErr: true, + }, + { + name: "num schedulers negative", + inputConfig: overlayDefaultFunc(func(config *Config) { + config.Server.NumSchedulers = pointer.Of(-100) + }), + expectErr: true, + }, + { + name: "valid", + inputConfig: overlayDefaultFunc(func(config *Config) { + config.Server.NumSchedulers = pointer.Of(runtime.NumCPU()) + }), + expectErr: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, actualErr := convertServerConfig(tc.inputConfig) + if tc.expectErr { + must.Error(t, actualErr) + } else { + must.NoError(t, actualErr) + } + }) + } +} + func TestConvertClientConfig(t *testing.T) { ci.Parallel(t) cases := []struct { diff --git a/nomad/leader_test.go b/nomad/leader_test.go index b8dd84711..e9f8967ef 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -6,6 +6,7 @@ package nomad import ( "errors" "fmt" + "runtime" "sort" "strconv" "testing" @@ -1426,11 +1427,11 @@ func TestLeader_PausingWorkers(t *testing.T) { ci.Parallel(t) s1, cleanupS1 := TestServer(t, func(c *Config) { - c.NumSchedulers = 12 + c.NumSchedulers = runtime.NumCPU() }) defer cleanupS1() testutil.WaitForLeader(t, s1.RPC) - require.Len(t, s1.workers, 12) + require.Len(t, s1.workers, runtime.NumCPU()) // this satisfies the require.Eventually test interface checkPaused := func(count int) func() bool { @@ -1450,7 +1451,7 @@ func TestLeader_PausingWorkers(t *testing.T) { } // acquiring leadership should have paused 3/4 of the workers - require.Eventually(t, checkPaused(9), 1*time.Second, 10*time.Millisecond, "scheduler workers did not pause within a second at leadership change") + require.Eventually(t, checkPaused(3*runtime.NumCPU()/4), 1*time.Second, 10*time.Millisecond, "scheduler workers did not pause within a second at leadership change") err := s1.revokeLeadership() require.NoError(t, err) diff --git a/nomad/server.go b/nomad/server.go index 6383938de..b74dd787f 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -14,6 +14,7 @@ import ( "net/rpc" "os" "path/filepath" + "runtime" "sort" "strconv" "strings" @@ -1642,12 +1643,12 @@ func (swpa SchedulerWorkerPoolArgs) IsInvalid() bool { return !swpa.IsValid() } -// IsValid verifies that the pool arguments are valid. That is, they have a non-negative -// numSchedulers value and the enabledSchedulers list has _core and only refers to known +// IsValid verifies that the pool arguments are valid. That is, they have a +// non-negative numSchedulers value which is less than the number of CPUs on the +// machine and the enabledSchedulers list has _core and only refers to known // schedulers. func (swpa SchedulerWorkerPoolArgs) IsValid() bool { - if swpa.NumSchedulers < 0 { - // the pool has to be non-negative + if swpa.NumSchedulers < 0 || swpa.NumSchedulers > runtime.NumCPU() { return false } @@ -1686,7 +1687,7 @@ func getSchedulerWorkerPoolArgsFromConfigLocked(c *Config) *SchedulerWorkerPoolA } } -// GetSchedulerWorkerInfo returns a slice of WorkerInfos from all of +// GetSchedulerWorkersInfo returns a slice of WorkerInfos from all of // the running scheduler workers. func (s *Server) GetSchedulerWorkersInfo() []WorkerInfo { s.workerLock.RLock() diff --git a/nomad/server_test.go b/nomad/server_test.go index 4893e3cf6..a4422df38 100644 --- a/nomad/server_test.go +++ b/nomad/server_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "path" + "runtime" "strings" "testing" "time" @@ -566,14 +567,14 @@ func TestServer_ReloadSchedulers_NumSchedulers(t *testing.T) { ci.Parallel(t) s1, cleanupS1 := TestServer(t, func(c *Config) { - c.NumSchedulers = 8 + c.NumSchedulers = runtime.NumCPU() }) defer cleanupS1() require.Equal(t, s1.config.NumSchedulers, len(s1.workers)) config := DefaultConfig() - config.NumSchedulers = 4 + config.NumSchedulers = runtime.NumCPU() / 2 require.NoError(t, s1.Reload(config)) time.Sleep(1 * time.Second)