server: Validate config num schedulers is between 0 and num CPUs. (#25441)

The `server.num_scheduler` configuration value should be a value
between 0 and the number of CPUs on the machine. The Nomad agent
was not validating the configuration parameter which meant you
could use a negative value or a value much larger than the
available machine CPUs. This change enforces validation of the
configuration value both on server startup and when the agent is
reloaded.

The Nomad API was only performing negative value validation when
updating the scheduler number via this method. This change adds
to the validation to ensure the number is not greater than the
CPUs on the machine.
This commit is contained in:
James Rasell
2025-03-20 08:29:57 +01:00
committed by GitHub
parent 220b53aba8
commit 5a157eb123
7 changed files with 120 additions and 23 deletions

3
.changelog/25441.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
server: Validate `num_schedulers` configuration parameter is between 0 and the number of CPUs available on the machine
```

View File

@@ -616,6 +616,15 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
conf.KEKProviderConfigs = agentConfig.KEKProviders
// Ensure the passed number of scheduler is between the bounds of zero and
// the number of CPU cores on the machine. The runtime CPU count object is
// populated at process start time, so there is no overhead in calling the
// function compared to saving the value.
if conf.NumSchedulers < 0 || conf.NumSchedulers > runtime.NumCPU() {
return nil, fmt.Errorf("number of schedulers should be between 0 and %d",
runtime.NumCPU())
}
return conf, nil
}

View File

@@ -15,6 +15,7 @@ import (
"net/url"
"os"
"reflect"
"runtime"
"strconv"
"strings"
"sync"
@@ -1702,6 +1703,14 @@ type schedulerWorkerConfigTest_testExpect struct {
// These test cases are run for both the ACL and Non-ACL enabled servers. When
// ACLS are not enabled, the request.aclTokens are ignored.
func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequestTest {
numCPU := runtime.NumCPU()
halfCPU := numCPU / 2
if halfCPU == 0 {
halfCPU = 1
}
forbidden := schedulerWorkerConfigTest_testExpect{
expectedResponseCode: http.StatusForbidden,
expectedResponse: structs.ErrPermissionDenied.Error(),
@@ -1716,12 +1725,11 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ
}
success1 := schedulerWorkerConfigTest_testExpect{
expectedResponseCode: http.StatusOK,
expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: 8},
expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: numCPU},
}
success2 := schedulerWorkerConfigTest_testExpect{
expectedResponseCode: http.StatusOK,
expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: 9},
expectedResponse: &api.AgentSchedulerWorkerConfigResponse{EnabledSchedulers: []string{"_core", "batch"}, NumSchedulers: halfCPU},
}
return []scheduleWorkerConfigTest_workerRequestTest{
@@ -1780,7 +1788,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ
request: schedulerWorkerConfigTest_testRequest{
verb: http.MethodPost,
aclToken: "",
requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`,
requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, halfCPU),
},
whenACLNotEnabled: success2,
whenACLEnabled: forbidden,
@@ -1790,7 +1798,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ
request: schedulerWorkerConfigTest_testRequest{
verb: http.MethodPut,
aclToken: "",
requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`,
requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, numCPU),
},
whenACLNotEnabled: success1,
whenACLEnabled: forbidden,
@@ -1800,7 +1808,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ
request: schedulerWorkerConfigTest_testRequest{
verb: http.MethodPost,
aclToken: "node_write",
requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`,
requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, halfCPU),
},
whenACLNotEnabled: success2,
whenACLEnabled: forbidden,
@@ -1810,7 +1818,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ
request: schedulerWorkerConfigTest_testRequest{
verb: http.MethodPut,
aclToken: "node_write",
requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`,
requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, numCPU),
},
whenACLNotEnabled: success1,
whenACLEnabled: forbidden,
@@ -1820,7 +1828,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ
request: schedulerWorkerConfigTest_testRequest{
verb: http.MethodPost,
aclToken: "agent_write",
requestBody: `{"num_schedulers":9,"enabled_schedulers":["_core", "batch"]}`,
requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, halfCPU),
},
whenACLNotEnabled: success2,
whenACLEnabled: success2,
@@ -1830,7 +1838,7 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ
request: schedulerWorkerConfigTest_testRequest{
verb: http.MethodPut,
aclToken: "agent_write",
requestBody: `{"num_schedulers":8,"enabled_schedulers":["_core", "batch"]}`,
requestBody: fmt.Sprintf(`{"num_schedulers":%d,"enabled_schedulers":["_core", "batch"]}`, numCPU),
},
whenACLNotEnabled: success1,
whenACLEnabled: success1,
@@ -1895,6 +1903,26 @@ func schedulerWorkerConfigTest_testCases() []scheduleWorkerConfigTest_workerRequ
whenACLNotEnabled: invalidRequest,
whenACLEnabled: invalidRequest,
},
{
name: "post with too many schedulers",
request: schedulerWorkerConfigTest_testRequest{
verb: http.MethodPost,
aclToken: "agent_write",
requestBody: `{"num_schedulers":9223372036854775807,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: invalidRequest,
whenACLEnabled: invalidRequest,
},
{
name: "put with too many schedulers",
request: schedulerWorkerConfigTest_testRequest{
verb: http.MethodPut,
aclToken: "agent_write",
requestBody: `{"num_schedulers":9223372036854775807,"enabled_schedulers":["_core", "batch"]}`,
},
whenACLNotEnabled: invalidRequest,
whenACLEnabled: invalidRequest,
},
}
}
@@ -1902,8 +1930,7 @@ func TestHTTP_AgentSchedulerWorkerConfigRequest_NoACL(t *testing.T) {
ci.Parallel(t)
configFn := func(c *Config) {
var numSchedulers = 8
c.Server.NumSchedulers = &numSchedulers
c.Server.NumSchedulers = pointer.Of(runtime.NumCPU())
c.Server.EnabledSchedulers = []string{"_core", "batch"}
c.Client.Enabled = false
}
@@ -1935,8 +1962,7 @@ func TestHTTP_AgentSchedulerWorkerConfigRequest_ACL(t *testing.T) {
ci.Parallel(t)
configFn := func(c *Config) {
var numSchedulers = 8
c.Server.NumSchedulers = &numSchedulers
c.Server.NumSchedulers = pointer.Of(runtime.NumCPU())
c.Server.EnabledSchedulers = []string{"_core", "batch"}
c.Client.Enabled = false
}

View File

@@ -8,6 +8,7 @@ import (
"math"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"time"
@@ -678,6 +679,61 @@ func TestAgent_ServerConfig_RaftProtocol_3(t *testing.T) {
}
}
func Test_convertServerConfig_errors(t *testing.T) {
ci.Parallel(t)
// This helper function provides an easy way to modify individual parameters
// within the configuration, without having to populate all objects that
// cause a panic when missing.
overlayDefaultFunc := func(cb func(*Config)) *Config {
defaultConfig := DevConfig(nil)
if cb != nil {
cb(defaultConfig)
}
_ = defaultConfig.normalizeAddrs()
return defaultConfig
}
testCases := []struct {
name string
inputConfig *Config
expectErr bool
}{
{
name: "num schedulers too big",
inputConfig: overlayDefaultFunc(func(config *Config) {
config.Server.NumSchedulers = pointer.Of(1<<63 - 1)
}),
expectErr: true,
},
{
name: "num schedulers negative",
inputConfig: overlayDefaultFunc(func(config *Config) {
config.Server.NumSchedulers = pointer.Of(-100)
}),
expectErr: true,
},
{
name: "valid",
inputConfig: overlayDefaultFunc(func(config *Config) {
config.Server.NumSchedulers = pointer.Of(runtime.NumCPU())
}),
expectErr: false,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
_, actualErr := convertServerConfig(tc.inputConfig)
if tc.expectErr {
must.Error(t, actualErr)
} else {
must.NoError(t, actualErr)
}
})
}
}
func TestConvertClientConfig(t *testing.T) {
ci.Parallel(t)
cases := []struct {

View File

@@ -6,6 +6,7 @@ package nomad
import (
"errors"
"fmt"
"runtime"
"sort"
"strconv"
"testing"
@@ -1426,11 +1427,11 @@ func TestLeader_PausingWorkers(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 12
c.NumSchedulers = runtime.NumCPU()
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)
require.Len(t, s1.workers, 12)
require.Len(t, s1.workers, runtime.NumCPU())
// this satisfies the require.Eventually test interface
checkPaused := func(count int) func() bool {
@@ -1450,7 +1451,7 @@ func TestLeader_PausingWorkers(t *testing.T) {
}
// acquiring leadership should have paused 3/4 of the workers
require.Eventually(t, checkPaused(9), 1*time.Second, 10*time.Millisecond, "scheduler workers did not pause within a second at leadership change")
require.Eventually(t, checkPaused(3*runtime.NumCPU()/4), 1*time.Second, 10*time.Millisecond, "scheduler workers did not pause within a second at leadership change")
err := s1.revokeLeadership()
require.NoError(t, err)

View File

@@ -14,6 +14,7 @@ import (
"net/rpc"
"os"
"path/filepath"
"runtime"
"sort"
"strconv"
"strings"
@@ -1642,12 +1643,12 @@ func (swpa SchedulerWorkerPoolArgs) IsInvalid() bool {
return !swpa.IsValid()
}
// IsValid verifies that the pool arguments are valid. That is, they have a non-negative
// numSchedulers value and the enabledSchedulers list has _core and only refers to known
// IsValid verifies that the pool arguments are valid. That is, they have a
// non-negative numSchedulers value which is less than the number of CPUs on the
// machine and the enabledSchedulers list has _core and only refers to known
// schedulers.
func (swpa SchedulerWorkerPoolArgs) IsValid() bool {
if swpa.NumSchedulers < 0 {
// the pool has to be non-negative
if swpa.NumSchedulers < 0 || swpa.NumSchedulers > runtime.NumCPU() {
return false
}
@@ -1686,7 +1687,7 @@ func getSchedulerWorkerPoolArgsFromConfigLocked(c *Config) *SchedulerWorkerPoolA
}
}
// GetSchedulerWorkerInfo returns a slice of WorkerInfos from all of
// GetSchedulerWorkersInfo returns a slice of WorkerInfos from all of
// the running scheduler workers.
func (s *Server) GetSchedulerWorkersInfo() []WorkerInfo {
s.workerLock.RLock()

View File

@@ -7,6 +7,7 @@ import (
"context"
"fmt"
"path"
"runtime"
"strings"
"testing"
"time"
@@ -566,14 +567,14 @@ func TestServer_ReloadSchedulers_NumSchedulers(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 8
c.NumSchedulers = runtime.NumCPU()
})
defer cleanupS1()
require.Equal(t, s1.config.NumSchedulers, len(s1.workers))
config := DefaultConfig()
config.NumSchedulers = 4
config.NumSchedulers = runtime.NumCPU() / 2
require.NoError(t, s1.Reload(config))
time.Sleep(1 * time.Second)