From 5a157eb1230f97152300912d954ba33d1e5125eb Mon Sep 17 00:00:00 2001 From: James Rasell Date: Thu, 20 Mar 2025 08:29:57 +0100 Subject: [PATCH] server: Validate config num schedulers is between 0 and num CPUs. (#25441) The `server.num_scheduler` configuration value should be a value between 0 and the number of CPUs on the machine. The Nomad agent was not validating the configuration parameter which meant you could use a negative value or a value much larger than the available machine CPUs. This change enforces validation of the configuration value both on server startup and when the agent is reloaded. The Nomad API was only performing negative value validation when updating the scheduler number via this method. This change adds to the validation to ensure the number is not greater than the CPUs on the machine. --- .changelog/25441.txt | 3 ++ command/agent/agent.go | 9 +++++ command/agent/agent_endpoint_test.go | 52 +++++++++++++++++++------- command/agent/agent_test.go | 56 ++++++++++++++++++++++++++++ nomad/leader_test.go | 7 ++-- nomad/server.go | 11 +++--- nomad/server_test.go | 5 ++- 7 files changed, 120 insertions(+), 23 deletions(-) create mode 100644 .changelog/25441.txt diff --git a/.changelog/25441.txt b/.changelog/25441.txt new file mode 100644 index 000000000..0e6c2b6b1 --- /dev/null +++ b/.changelog/25441.txt @@ -0,0 +1,3 @@ +```release-note:bug +server: Validate `num_schedulers` configuration parameter is between 0 and the number of CPUs available on the machine +``` diff --git a/command/agent/agent.go b/command/agent/agent.go index 8d8581b22..c8606f574 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -616,6 +616,15 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { conf.KEKProviderConfigs = agentConfig.KEKProviders + // Ensure the passed number of scheduler is between the bounds of zero and + // the number of CPU cores on the machine. The runtime CPU count object is + // populated at process start time, so there is no overhead in calling the + // function compared to saving the value. + if conf.NumSchedulers < 0 || conf.NumSchedulers > runtime.NumCPU() { + return nil, fmt.Errorf("number of schedulers should be between 0 and %d", + runtime.NumCPU()) + } + return conf, nil } diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index c840648fd..586cec6ce 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -15,6 +15,7 @@ import ( "net/url" "os" "reflect" + "runtime" "strconv" "strings" "sync" @@ -1702,6 +1703,14 @@ type schedulerWorkerConfigTest_testExpect struct { // These test cases are run for both the ACL and Non-ACL enabled servers. When // ACLS are not enabled, the request.aclTokens are ignored. func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequestTest { + + numCPU := runtime.NumCPU() + + halfCPU := numCPU / 2 + if halfCPU == 0 { + halfCPU = 1 + } + forbidden := schedulerWorkerConfigTest_testExpect{ expectedResponseCode: http.StatusForbidden, expectedResponse: structs.ErrPermissionDenied.Error(), @@ -1716,12 +1725,11 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ } success1 := schedulerWorkerConfigTest_testExpect{ expectedResponseCode: http.StatusOK, - expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: 8}, + expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: numCPU}, } - success2 := schedulerWorkerConfigTest_testExpect{ expectedResponseCode: http.StatusOK, - expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: 9}, + expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: halfCPU}, } return []scheduleWorkerConfigTest_workerRequestTest{ @@ -1780,7 +1788,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ request: schedulerWorkerConfigTest_testRequest{ verb: http.MethodPost, aclToken: "", - requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`, + requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, halfCPU), }, whenACLNotEnabled: success2, whenACLEnabled: forbidden, @@ -1790,7 +1798,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ request: schedulerWorkerConfigTest_testRequest{ verb: http.MethodPut, aclToken: "", - requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`, + requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, numCPU), }, whenACLNotEnabled: success1, whenACLEnabled: forbidden, @@ -1800,7 +1808,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ request: schedulerWorkerConfigTest_testRequest{ verb: http.MethodPost, aclToken: "node_write", - requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`, + requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, halfCPU), }, whenACLNotEnabled: success2, whenACLEnabled: forbidden, @@ -1810,7 +1818,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ request: schedulerWorkerConfigTest_testRequest{ verb: http.MethodPut, aclToken: "node_write", - requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`, + requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, numCPU), }, whenACLNotEnabled: success1, whenACLEnabled: forbidden, @@ -1820,7 +1828,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ request: schedulerWorkerConfigTest_testRequest{ verb: http.MethodPost, aclToken: "agent_write", - requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`, + requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, halfCPU), }, whenACLNotEnabled: success2, whenACLEnabled: success2, @@ -1830,7 +1838,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ request: schedulerWorkerConfigTest_testRequest{ verb: http.MethodPut, aclToken: "agent_write", - requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`, + requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, numCPU), }, whenACLNotEnabled: success1, whenACLEnabled: success1, @@ -1895,6 +1903,26 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ whenACLNotEnabled: invalidRequest, whenACLEnabled: invalidRequest, }, + { + name: "post with too many schedulers", + request: schedulerWorkerConfigTest_testRequest{ + verb: http.MethodPost, + aclToken: "agent_write", + requestBody: `{"num_schedulers":9223372036854775807,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: invalidRequest, + whenACLEnabled: invalidRequest, + }, + { + name: "put with too many schedulers", + request: schedulerWorkerConfigTest_testRequest{ + verb: http.MethodPut, + aclToken: "agent_write", + requestBody: `{"num_schedulers":9223372036854775807,"enabled_schedulers":["_core", "batch"]}`, + }, + whenACLNotEnabled: invalidRequest, + whenACLEnabled: invalidRequest, + }, } } @@ -1902,8 +1930,7 @@ func TestHTTP_AgentSchedulerWorkerConfigRequest_NoACL(t *testing.T) { ci.Parallel(t) configFn := func(c *Config) { - var numSchedulers = 8 - c.Server.NumSchedulers = &numSchedulers + c.Server.NumSchedulers = pointer.Of(runtime.NumCPU()) c.Server.EnabledSchedulers = []string{"_core", "batch"} c.Client.Enabled = false } @@ -1935,8 +1962,7 @@ func TestHTTP_AgentSchedulerWorkerConfigRequest_ACL(t *testing.T) { ci.Parallel(t) configFn := func(c *Config) { - var numSchedulers = 8 - c.Server.NumSchedulers = &numSchedulers + c.Server.NumSchedulers = pointer.Of(runtime.NumCPU()) c.Server.EnabledSchedulers = []string{"_core", "batch"} c.Client.Enabled = false } diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index aac7d6191..89c12cae9 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -8,6 +8,7 @@ import ( "math" "os" "path/filepath" + "runtime" "strings" "testing" "time" @@ -678,6 +679,61 @@ func TestAgent_ServerConfig_RaftProtocol_3(t *testing.T) { } } +func Test_convertServerConfig_errors(t *testing.T) { + ci.Parallel(t) + + // This helper function provides an easy way to modify individual parameters + // within the configuration, without having to populate all objects that + // cause a panic when missing. + overlayDefaultFunc := func(cb func(*Config)) *Config { + defaultConfig := DevConfig(nil) + if cb != nil { + cb(defaultConfig) + } + _ = defaultConfig.normalizeAddrs() + return defaultConfig + } + + testCases := []struct { + name string + inputConfig *Config + expectErr bool + }{ + { + name: "num schedulers too big", + inputConfig: overlayDefaultFunc(func(config *Config) { + config.Server.NumSchedulers = pointer.Of(1<<63 - 1) + }), + expectErr: true, + }, + { + name: "num schedulers negative", + inputConfig: overlayDefaultFunc(func(config *Config) { + config.Server.NumSchedulers = pointer.Of(-100) + }), + expectErr: true, + }, + { + name: "valid", + inputConfig: overlayDefaultFunc(func(config *Config) { + config.Server.NumSchedulers = pointer.Of(runtime.NumCPU()) + }), + expectErr: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + _, actualErr := convertServerConfig(tc.inputConfig) + if tc.expectErr { + must.Error(t, actualErr) + } else { + must.NoError(t, actualErr) + } + }) + } +} + func TestConvertClientConfig(t *testing.T) { ci.Parallel(t) cases := []struct { diff --git a/nomad/leader_test.go b/nomad/leader_test.go index b8dd84711..e9f8967ef 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -6,6 +6,7 @@ package nomad import ( "errors" "fmt" + "runtime" "sort" "strconv" "testing" @@ -1426,11 +1427,11 @@ func TestLeader_PausingWorkers(t *testing.T) { ci.Parallel(t) s1, cleanupS1 := TestServer(t, func(c *Config) { - c.NumSchedulers = 12 + c.NumSchedulers = runtime.NumCPU() }) defer cleanupS1() testutil.WaitForLeader(t, s1.RPC) - require.Len(t, s1.workers, 12) + require.Len(t, s1.workers, runtime.NumCPU()) // this satisfies the require.Eventually test interface checkPaused := func(count int) func() bool { @@ -1450,7 +1451,7 @@ func TestLeader_PausingWorkers(t *testing.T) { } // acquiring leadership should have paused 3/4 of the workers - require.Eventually(t, checkPaused(9), 1*time.Second, 10*time.Millisecond, "scheduler workers did not pause within a second at leadership change") + require.Eventually(t, checkPaused(3*runtime.NumCPU()/4), 1*time.Second, 10*time.Millisecond, "scheduler workers did not pause within a second at leadership change") err := s1.revokeLeadership() require.NoError(t, err) diff --git a/nomad/server.go b/nomad/server.go index 6383938de..b74dd787f 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -14,6 +14,7 @@ import ( "net/rpc" "os" "path/filepath" + "runtime" "sort" "strconv" "strings" @@ -1642,12 +1643,12 @@ func (swpa SchedulerWorkerPoolArgs) IsInvalid() bool { return !swpa.IsValid() } -// IsValid verifies that the pool arguments are valid. That is, they have a non-negative -// numSchedulers value and the enabledSchedulers list has _core and only refers to known +// IsValid verifies that the pool arguments are valid. That is, they have a +// non-negative numSchedulers value which is less than the number of CPUs on the +// machine and the enabledSchedulers list has _core and only refers to known // schedulers. func (swpa SchedulerWorkerPoolArgs) IsValid() bool { - if swpa.NumSchedulers < 0 { - // the pool has to be non-negative + if swpa.NumSchedulers < 0 || swpa.NumSchedulers > runtime.NumCPU() { return false } @@ -1686,7 +1687,7 @@ func getSchedulerWorkerPoolArgsFromConfigLocked(c *Config) *SchedulerWorkerPoolA } } -// GetSchedulerWorkerInfo returns a slice of WorkerInfos from all of +// GetSchedulerWorkersInfo returns a slice of WorkerInfos from all of // the running scheduler workers. func (s *Server) GetSchedulerWorkersInfo() []WorkerInfo { s.workerLock.RLock() diff --git a/nomad/server_test.go b/nomad/server_test.go index 4893e3cf6..a4422df38 100644 --- a/nomad/server_test.go +++ b/nomad/server_test.go @@ -7,6 +7,7 @@ import ( "context" "fmt" "path" + "runtime" "strings" "testing" "time" @@ -566,14 +567,14 @@ func TestServer_ReloadSchedulers_NumSchedulers(t *testing.T) { ci.Parallel(t) s1, cleanupS1 := TestServer(t, func(c *Config) { - c.NumSchedulers = 8 + c.NumSchedulers = runtime.NumCPU() }) defer cleanupS1() require.Equal(t, s1.config.NumSchedulers, len(s1.workers)) config := DefaultConfig() - config.NumSchedulers = 4 + config.NumSchedulers = runtime.NumCPU() / 2 require.NoError(t, s1.Reload(config)) time.Sleep(1 * time.Second)