Add job_max_count option to keep Nomad server from running out of memory (#26858)

If a Nomad job is started with a large number of instances (e.g. 4 billion),
then the Nomad servers that attempt to schedule it will run out of memory and
crash. While it's unlikely that anyone would intentionally schedule a job with 4
billion instances, we have occasionally run into issues with bugs in external
automation. For example, an automated deployment system running on a test
environment had an off-by-one error, and deployed a job with count = uint32(-1),
causing the Nomad servers for that environment to run out of memory and crash.

To prevent this, this PR introduces a job_max_count Nomad server configuration
parameter. job_max_count limits the number of allocs that may be created from a
job. The default value is 50000 - this is low enough that a job with the maximum
possible number of allocs will not require much memory on the server, but is
still much higher than the number of allocs in the largest Nomad job we have
ever run.
This commit is contained in:
Brendan MacDonell
2025-10-06 06:35:10 -07:00
committed by GitHub
parent 48863bda8a
commit 26485c45a2
16 changed files with 198 additions and 2 deletions

3
.changelog/26858.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:improvement
config: Added job_max_count option to limit number of allocs for a single job
```

View File

@@ -352,6 +352,15 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
conf.JobMaxPriority = jobMaxPriority conf.JobMaxPriority = jobMaxPriority
conf.JobDefaultPriority = jobDefaultPriority conf.JobDefaultPriority = jobDefaultPriority
jobMaxCount := structs.JobDefaultMaxCount
if agentConfig.Server.JobMaxCount != nil {
jobMaxCount = *agentConfig.Server.JobMaxCount
if jobMaxCount < 0 {
return nil, fmt.Errorf("job_max_count (%d) cannot be negative", jobMaxCount)
}
}
conf.JobMaxCount = jobMaxCount
if agentConfig.Server.JobTrackedVersions != nil { if agentConfig.Server.JobTrackedVersions != nil {
if *agentConfig.Server.JobTrackedVersions <= 0 { if *agentConfig.Server.JobTrackedVersions <= 0 {
return nil, fmt.Errorf("job_tracked_versions must be greater than 0") return nil, fmt.Errorf("job_tracked_versions must be greater than 0")

View File

@@ -1945,6 +1945,71 @@ func TestAgent_ServerConfig_JobDefaultPriority_Bad(t *testing.T) {
} }
} }
func TestAgent_ServerConfig_JobMaxCount(t *testing.T) {
ci.Parallel(t)
cases := []struct {
configured *int
expected int
expectedErr string
}{
{
configured: nil,
expected: structs.JobDefaultMaxCount,
expectedErr: "",
},
{
configured: pointer.Of(1),
expected: 1,
expectedErr: "",
},
{
configured: pointer.Of(0),
expected: 0,
expectedErr: "",
},
{
configured: pointer.Of(structs.JobDefaultMaxCount),
expected: structs.JobDefaultMaxCount,
expectedErr: "",
},
{
configured: pointer.Of(2 * structs.JobDefaultMaxCount),
expected: 2 * structs.JobDefaultMaxCount,
expectedErr: "",
},
{
configured: pointer.Of(-1),
expected: 0,
expectedErr: "job_max_count (-1) cannot be negative",
},
{
configured: pointer.Of(-3),
expected: 0,
expectedErr: "job_max_count (-3) cannot be negative",
},
}
for _, tc := range cases {
t.Run(fmt.Sprint(tc.configured), func(t *testing.T) {
conf := DevConfig(nil)
must.NoError(t, conf.normalizeAddrs())
conf.Server.JobMaxCount = tc.configured
serverConf, err := convertServerConfig(conf)
if tc.expectedErr != "" {
must.Error(t, err)
must.ErrorContains(t, err, tc.expectedErr)
} else {
must.NoError(t, err)
must.Eq(t, tc.expected, serverConf.JobMaxCount)
}
})
}
}
func Test_convertServerConfig_clientIntroduction(t *testing.T) { func Test_convertServerConfig_clientIntroduction(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)

View File

@@ -761,6 +761,9 @@ type ServerConfig struct {
// JobMaxPriority is an upper bound on the Job priority. // JobMaxPriority is an upper bound on the Job priority.
JobMaxPriority *int `hcl:"job_max_priority"` JobMaxPriority *int `hcl:"job_max_priority"`
// JobMaxCount is an upper bound on the number of instances in a Job.
JobMaxCount *int `hcl:"job_max_count"`
// JobMaxSourceSize limits the maximum size of a jobs source hcl/json // JobMaxSourceSize limits the maximum size of a jobs source hcl/json
// before being discarded automatically. If unset, the maximum size defaults // before being discarded automatically. If unset, the maximum size defaults
// to 1 MB. If the value is zero, no job sources will be stored. // to 1 MB. If the value is zero, no job sources will be stored.
@@ -810,6 +813,7 @@ func (s *ServerConfig) Copy() *ServerConfig {
ns.RaftTrailingLogs = pointer.Copy(s.RaftTrailingLogs) ns.RaftTrailingLogs = pointer.Copy(s.RaftTrailingLogs)
ns.JobDefaultPriority = pointer.Copy(s.JobDefaultPriority) ns.JobDefaultPriority = pointer.Copy(s.JobDefaultPriority)
ns.JobMaxPriority = pointer.Copy(s.JobMaxPriority) ns.JobMaxPriority = pointer.Copy(s.JobMaxPriority)
ns.JobMaxCount = pointer.Copy(s.JobMaxCount)
ns.JobTrackedVersions = pointer.Copy(s.JobTrackedVersions) ns.JobTrackedVersions = pointer.Copy(s.JobTrackedVersions)
ns.ClientIntroduction = s.ClientIntroduction.Copy() ns.ClientIntroduction = s.ClientIntroduction.Copy()
return &ns return &ns
@@ -2505,6 +2509,9 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.JobMaxPriority != nil { if b.JobMaxPriority != nil {
result.JobMaxPriority = pointer.Of(*b.JobMaxPriority) result.JobMaxPriority = pointer.Of(*b.JobMaxPriority)
} }
if b.JobMaxCount != nil {
result.JobMaxCount = pointer.Of(*b.JobMaxCount)
}
if b.EvalGCThreshold != "" { if b.EvalGCThreshold != "" {
result.EvalGCThreshold = b.EvalGCThreshold result.EvalGCThreshold = b.EvalGCThreshold
} }

View File

@@ -159,6 +159,7 @@ var basicConfig = &Config{
LicensePath: "/tmp/nomad.hclic", LicensePath: "/tmp/nomad.hclic",
JobDefaultPriority: pointer.Of(100), JobDefaultPriority: pointer.Of(100),
JobMaxPriority: pointer.Of(200), JobMaxPriority: pointer.Of(200),
JobMaxCount: pointer.Of(1000),
StartTimeout: "1m", StartTimeout: "1m",
ClientIntroduction: &ClientIntroduction{ ClientIntroduction: &ClientIntroduction{
Enforcement: "warn", Enforcement: "warn",

View File

@@ -388,6 +388,7 @@ func TestConfig_Merge(t *testing.T) {
}, },
JobMaxPriority: pointer.Of(200), JobMaxPriority: pointer.Of(200),
JobDefaultPriority: pointer.Of(100), JobDefaultPriority: pointer.Of(100),
JobMaxCount: pointer.Of(1000),
OIDCIssuer: "https://oidc.test.nomadproject.io", OIDCIssuer: "https://oidc.test.nomadproject.io",
StartTimeout: "1m", StartTimeout: "1m",
}, },

View File

@@ -144,6 +144,7 @@ server {
event_buffer_size = 200 event_buffer_size = 200
job_default_priority = 100 job_default_priority = 100
job_max_priority = 200 job_max_priority = 200
job_max_count = 1000
start_timeout = "1m" start_timeout = "1m"
plan_rejection_tracker { plan_rejection_tracker {

View File

@@ -370,6 +370,7 @@
"license_path": "/tmp/nomad.hclic", "license_path": "/tmp/nomad.hclic",
"job_default_priority": 100, "job_default_priority": 100,
"job_max_priority": 200, "job_max_priority": 200,
"job_max_count": 1000,
"start_timeout": "1m" "start_timeout": "1m"
} }
], ],

View File

@@ -433,6 +433,9 @@ type Config struct {
// JobTrackedVersions is the number of historic Job versions that are kept. // JobTrackedVersions is the number of historic Job versions that are kept.
JobTrackedVersions int JobTrackedVersions int
// JobMaxCount is the maximum total task group counts for a single Job.
JobMaxCount int
Reporting *config.ReportingConfig Reporting *config.ReportingConfig
// OIDCIssuer is the URL for the OIDC Issuer field in Workload Identity JWTs. // OIDCIssuer is the URL for the OIDC Issuer field in Workload Identity JWTs.
@@ -664,6 +667,7 @@ func DefaultConfig() *Config {
DeploymentQueryRateLimit: deploymentwatcher.LimitStateQueriesPerSecond, DeploymentQueryRateLimit: deploymentwatcher.LimitStateQueriesPerSecond,
JobDefaultPriority: structs.JobDefaultPriority, JobDefaultPriority: structs.JobDefaultPriority,
JobMaxPriority: structs.JobDefaultMaxPriority, JobMaxPriority: structs.JobDefaultMaxPriority,
JobMaxCount: structs.JobDefaultMaxCount,
JobTrackedVersions: structs.JobDefaultTrackedVersions, JobTrackedVersions: structs.JobDefaultTrackedVersions,
StartTimeout: 30 * time.Second, StartTimeout: 30 * time.Second,
NodeIntroductionConfig: structs.DefaultNodeIntroductionConfig(), NodeIntroductionConfig: structs.DefaultNodeIntroductionConfig(),

View File

@@ -1040,8 +1040,19 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
} }
} }
// Ensure that JobMaxCount is respected.
newCount := int(*args.Count)
totalCount := 0
for _, tg := range job.TaskGroups {
totalCount += tg.Count
}
totalCount = totalCount - group.Count + newCount
if j.srv.config.JobMaxCount > 0 && totalCount > j.srv.config.JobMaxCount {
return fmt.Errorf("total count was greater than configured job_max_count: %d > %d", totalCount, j.srv.config.JobMaxCount)
}
// Update group count // Update group count
group.Count = int(*args.Count) group.Count = newCount
job.SubmitTime = now job.SubmitTime = now
// Block scaling event if there's an active deployment // Block scaling event if there's an active deployment

View File

@@ -521,7 +521,10 @@ func (v *jobValidate) Validate(job *structs.Job) (warnings []error, err error) {
okForIdentity := v.isEligibleForMultiIdentity() okForIdentity := v.isEligibleForMultiIdentity()
totalCount := 0
for _, tg := range job.TaskGroups { for _, tg := range job.TaskGroups {
totalCount += tg.Count
for _, s := range tg.Services { for _, s := range tg.Services {
serviceErrs := v.validateServiceIdentity( serviceErrs := v.validateServiceIdentity(
s, fmt.Sprintf("task group %s", tg.Name), okForIdentity) s, fmt.Sprintf("task group %s", tg.Name), okForIdentity)
@@ -543,6 +546,10 @@ func (v *jobValidate) Validate(job *structs.Job) (warnings []error, err error) {
warnings = append(warnings, vaultWarns...) warnings = append(warnings, vaultWarns...)
} }
} }
if v.srv.config.JobMaxCount > 0 && totalCount > v.srv.config.JobMaxCount {
err := fmt.Errorf("total count was greater than configured job_max_count: %d > %d", totalCount, v.srv.config.JobMaxCount)
multierror.Append(validationErrors, err)
}
return warnings, validationErrors.ErrorOrNil() return warnings, validationErrors.ErrorOrNil()
} }

View File

@@ -16,6 +16,34 @@ import (
"github.com/shoenig/test/must" "github.com/shoenig/test/must"
) )
func Test_jobValidate_Validate(t *testing.T) {
ci.Parallel(t)
t.Run("error if task group count exceeds job_max_count", func(t *testing.T) {
impl := jobValidate{srv: &Server{config: &Config{JobMaxCount: 10, JobMaxPriority: 100}}}
job := mock.Job()
job.TaskGroups[0].Count = 11
_, err := impl.Validate(job)
must.ErrorContains(t, err, "total count was greater than configured job_max_count: 11 > 10")
})
t.Run("no error if task group count equals job_max_count", func(t *testing.T) {
impl := jobValidate{srv: &Server{config: &Config{JobMaxCount: 10, JobMaxPriority: 100}}}
job := mock.Job()
job.TaskGroups[0].Count = 10
_, err := impl.Validate(job)
must.NoError(t, err)
})
t.Run("no error if job_max_count is zero (i.e. unlimited)", func(t *testing.T) {
impl := jobValidate{srv: &Server{config: &Config{JobMaxCount: 0, JobMaxPriority: 100}}}
job := mock.Job()
job.TaskGroups[0].Count = structs.JobDefaultMaxCount + 1
_, err := impl.Validate(job)
must.NoError(t, err)
})
}
func Test_jobValidate_Validate_consul_service(t *testing.T) { func Test_jobValidate_Validate_consul_service(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)

View File

@@ -7683,7 +7683,7 @@ func TestJobEndpoint_Scale_Invalid(t *testing.T) {
require.Contains(err.Error(), "should not contain count if error is true") require.Contains(err.Error(), "should not contain count if error is true")
} }
func TestJobEndpoint_Scale_OutOfBounds(t *testing.T) { func TestJobEndpoint_Scale_TaskGroupOutOfBounds(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
require := require.New(t) require := require.New(t)
@@ -7726,6 +7726,44 @@ func TestJobEndpoint_Scale_OutOfBounds(t *testing.T) {
require.Contains(err.Error(), "group count was less than scaling policy minimum: 2 < 3") require.Contains(err.Error(), "group count was less than scaling policy minimum: 2 < 3")
} }
func TestJobEndpoint_Scale_JobOutOfBounds(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(config *Config) {
config.JobMaxCount = 4
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
state := s1.fsm.State()
const requestedCount = 6
job := mock.Job()
job.TaskGroups[0].Count = requestedCount
// register the job
err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job)
must.NoError(t, err)
var resp structs.JobRegisterResponse
scale := &structs.JobScaleRequest{
JobID: job.ID,
Target: map[string]string{
structs.ScalingTargetGroup: job.TaskGroups[0].Name,
},
Count: pointer.Of(int64(requestedCount)),
Message: "count too high",
PolicyOverride: false,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp)
must.Error(t, err)
must.ErrorContains(t, err, "total count was greater than configured job_max_count: 6 > 4")
}
func TestJobEndpoint_Scale_NoEval(t *testing.T) { func TestJobEndpoint_Scale_NoEval(t *testing.T) {
ci.Parallel(t) ci.Parallel(t)
require := require.New(t) require := require.New(t)

View File

@@ -4327,6 +4327,9 @@ const (
// JobMaxPriority is the maximum allowed configuration value for maximum job priority // JobMaxPriority is the maximum allowed configuration value for maximum job priority
JobMaxPriority = math.MaxInt16 - 1 JobMaxPriority = math.MaxInt16 - 1
// JobDefaultMaxCount is the default maximum total task group counts per job
JobDefaultMaxCount = 50000
// CoreJobPriority should be higher than any user // CoreJobPriority should be higher than any user
// specified job so that it gets priority. This is important // specified job so that it gets priority. This is important
// for the system to remain healthy. // for the system to remain healthy.

View File

@@ -289,6 +289,14 @@ server {
- `job_default_priority` `(int: 50)` - Specifies the default priority assigned to a job. - `job_default_priority` `(int: 50)` - Specifies the default priority assigned to a job.
A valid value must be between `50` and `job_max_priority`. A valid value must be between `50` and `job_max_priority`.
- `job_max_count` `(int: 50000)` - Specifies the maximum number of allocations
for a job, as represented by the sum of its task group `count` fields. Jobs
of type `system` ignore this value. The child jobs of dispatched batch jobs
or periodic jobs are counted separately from their parent job. This value
must be non-negative. If set to 0, no limit is enforced. This value is enforced
at the time the job is submitted or scaled, and updating the value will not
impact existing jobs.
- `job_max_source_size` `(string: "1M")` - Specifies the size limit of the associated - `job_max_source_size` `(string: "1M")` - Specifies the size limit of the associated
job source content when registering a job. Note this is not a limit on the actual job source content when registering a job. Note this is not a limit on the actual
size of a job. If the limit is exceeded, the original source is simply discarded size of a job. If the limit is exceeded, the original source is simply discarded

View File

@@ -38,6 +38,15 @@ being silently ignored. Any existing policies with duplicate or invalid keys
will continue to work, but the source policy document will need to be updated will continue to work, but the source policy document will need to be updated
to be valid before it can be written to Nomad. to be valid before it can be written to Nomad.
#### Maximum number of allocations per job is limited by default
Nomad 1.11.0 limits the maximum number of allocations for a job to the value of
the new [`job_max_count`](/nomad/docs/configuration/server#job_max_count) server
configuration option, which defaults to 50000. The number of allocations is
determined from the sum of the job's task group `count` fields. This limit is
enforced at the time the job is submitted or scaled, and updating the value will
not impact existing jobs.
## Nomad 1.10.6 ## Nomad 1.10.6
#### ACL policies no longer silently ignore duplicate or invalid keys #### ACL policies no longer silently ignore duplicate or invalid keys