From 26485c45a2fb4df27f0c1df920213d6d42a3bc66 Mon Sep 17 00:00:00 2001 From: Brendan MacDonell <210888555+bmacdonell-roblox@users.noreply.github.com> Date: Mon, 6 Oct 2025 06:35:10 -0700 Subject: [PATCH] Add job_max_count option to keep Nomad server from running out of memory (#26858) If a Nomad job is started with a large number of instances (e.g. 4 billion), then the Nomad servers that attempt to schedule it will run out of memory and crash. While it's unlikely that anyone would intentionally schedule a job with 4 billion instances, we have occasionally run into issues with bugs in external automation. For example, an automated deployment system running on a test environment had an off-by-one error, and deployed a job with count = uint32(-1), causing the Nomad servers for that environment to run out of memory and crash. To prevent this, this PR introduces a job_max_count Nomad server configuration parameter. job_max_count limits the number of allocs that may be created from a job. The default value is 50000 - this is low enough that a job with the maximum possible number of allocs will not require much memory on the server, but is still much higher than the number of allocs in the largest Nomad job we have ever run. --- .changelog/26858.txt | 3 + command/agent/agent.go | 9 +++ command/agent/agent_test.go | 65 +++++++++++++++++++ command/agent/config.go | 7 ++ command/agent/config_parse_test.go | 1 + command/agent/config_test.go | 1 + command/agent/testdata/basic.hcl | 1 + command/agent/testdata/basic.json | 1 + nomad/config.go | 4 ++ nomad/job_endpoint.go | 13 +++- nomad/job_endpoint_hooks.go | 7 ++ nomad/job_endpoint_hooks_test.go | 28 ++++++++ nomad/job_endpoint_test.go | 40 +++++++++++- nomad/structs/structs.go | 3 + website/content/docs/configuration/server.mdx | 8 +++ .../content/docs/upgrade/upgrade-specific.mdx | 9 +++ 16 files changed, 198 insertions(+), 2 deletions(-) create mode 100644 .changelog/26858.txt diff --git a/.changelog/26858.txt b/.changelog/26858.txt new file mode 100644 index 000000000..1690ef30b --- /dev/null +++ b/.changelog/26858.txt @@ -0,0 +1,3 @@ +```release-note:improvement +config: Added job_max_count option to limit number of allocs for a single job +``` diff --git a/command/agent/agent.go b/command/agent/agent.go index c49b30e02..b797e4ed5 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -352,6 +352,15 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) { conf.JobMaxPriority = jobMaxPriority conf.JobDefaultPriority = jobDefaultPriority + jobMaxCount := structs.JobDefaultMaxCount + if agentConfig.Server.JobMaxCount != nil { + jobMaxCount = *agentConfig.Server.JobMaxCount + if jobMaxCount < 0 { + return nil, fmt.Errorf("job_max_count (%d) cannot be negative", jobMaxCount) + } + } + conf.JobMaxCount = jobMaxCount + if agentConfig.Server.JobTrackedVersions != nil { if *agentConfig.Server.JobTrackedVersions <= 0 { return nil, fmt.Errorf("job_tracked_versions must be greater than 0") diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index cd233c85a..388a2a52e 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -1945,6 +1945,71 @@ func TestAgent_ServerConfig_JobDefaultPriority_Bad(t *testing.T) { } } +func TestAgent_ServerConfig_JobMaxCount(t *testing.T) { + ci.Parallel(t) + + cases := []struct { + configured *int + expected int + expectedErr string + }{ + { + configured: nil, + expected: structs.JobDefaultMaxCount, + expectedErr: "", + }, + { + configured: pointer.Of(1), + expected: 1, + expectedErr: "", + }, + { + configured: pointer.Of(0), + expected: 0, + expectedErr: "", + }, + { + configured: pointer.Of(structs.JobDefaultMaxCount), + expected: structs.JobDefaultMaxCount, + expectedErr: "", + }, + { + configured: pointer.Of(2 * structs.JobDefaultMaxCount), + expected: 2 * structs.JobDefaultMaxCount, + expectedErr: "", + }, + { + configured: pointer.Of(-1), + expected: 0, + expectedErr: "job_max_count (-1) cannot be negative", + }, + { + configured: pointer.Of(-3), + expected: 0, + expectedErr: "job_max_count (-3) cannot be negative", + }, + } + + for _, tc := range cases { + t.Run(fmt.Sprint(tc.configured), func(t *testing.T) { + conf := DevConfig(nil) + must.NoError(t, conf.normalizeAddrs()) + + conf.Server.JobMaxCount = tc.configured + + serverConf, err := convertServerConfig(conf) + + if tc.expectedErr != "" { + must.Error(t, err) + must.ErrorContains(t, err, tc.expectedErr) + } else { + must.NoError(t, err) + must.Eq(t, tc.expected, serverConf.JobMaxCount) + } + }) + } +} + func Test_convertServerConfig_clientIntroduction(t *testing.T) { ci.Parallel(t) diff --git a/command/agent/config.go b/command/agent/config.go index ca46e0cce..8f8d1a815 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -761,6 +761,9 @@ type ServerConfig struct { // JobMaxPriority is an upper bound on the Job priority. JobMaxPriority *int `hcl:"job_max_priority"` + // JobMaxCount is an upper bound on the number of instances in a Job. + JobMaxCount *int `hcl:"job_max_count"` + // JobMaxSourceSize limits the maximum size of a jobs source hcl/json // before being discarded automatically. If unset, the maximum size defaults // to 1 MB. If the value is zero, no job sources will be stored. @@ -810,6 +813,7 @@ func (s *ServerConfig) Copy() *ServerConfig { ns.RaftTrailingLogs = pointer.Copy(s.RaftTrailingLogs) ns.JobDefaultPriority = pointer.Copy(s.JobDefaultPriority) ns.JobMaxPriority = pointer.Copy(s.JobMaxPriority) + ns.JobMaxCount = pointer.Copy(s.JobMaxCount) ns.JobTrackedVersions = pointer.Copy(s.JobTrackedVersions) ns.ClientIntroduction = s.ClientIntroduction.Copy() return &ns @@ -2505,6 +2509,9 @@ func (s *ServerConfig) Merge(b *ServerConfig) *ServerConfig { if b.JobMaxPriority != nil { result.JobMaxPriority = pointer.Of(*b.JobMaxPriority) } + if b.JobMaxCount != nil { + result.JobMaxCount = pointer.Of(*b.JobMaxCount) + } if b.EvalGCThreshold != "" { result.EvalGCThreshold = b.EvalGCThreshold } diff --git a/command/agent/config_parse_test.go b/command/agent/config_parse_test.go index 526c9c088..021cacaf2 100644 --- a/command/agent/config_parse_test.go +++ b/command/agent/config_parse_test.go @@ -159,6 +159,7 @@ var basicConfig = &Config{ LicensePath: "/tmp/nomad.hclic", JobDefaultPriority: pointer.Of(100), JobMaxPriority: pointer.Of(200), + JobMaxCount: pointer.Of(1000), StartTimeout: "1m", ClientIntroduction: &ClientIntroduction{ Enforcement: "warn", diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 6e1b38767..a8b19d1fd 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -388,6 +388,7 @@ func TestConfig_Merge(t *testing.T) { }, JobMaxPriority: pointer.Of(200), JobDefaultPriority: pointer.Of(100), + JobMaxCount: pointer.Of(1000), OIDCIssuer: "https://oidc.test.nomadproject.io", StartTimeout: "1m", }, diff --git a/command/agent/testdata/basic.hcl b/command/agent/testdata/basic.hcl index d41f7290e..cd004612f 100644 --- a/command/agent/testdata/basic.hcl +++ b/command/agent/testdata/basic.hcl @@ -144,6 +144,7 @@ server { event_buffer_size = 200 job_default_priority = 100 job_max_priority = 200 + job_max_count = 1000 start_timeout = "1m" plan_rejection_tracker { diff --git a/command/agent/testdata/basic.json b/command/agent/testdata/basic.json index d557b3122..ce7b44477 100644 --- a/command/agent/testdata/basic.json +++ b/command/agent/testdata/basic.json @@ -370,6 +370,7 @@ "license_path": "/tmp/nomad.hclic", "job_default_priority": 100, "job_max_priority": 200, + "job_max_count": 1000, "start_timeout": "1m" } ], diff --git a/nomad/config.go b/nomad/config.go index c4809d90e..9a1ab950e 100644 --- a/nomad/config.go +++ b/nomad/config.go @@ -433,6 +433,9 @@ type Config struct { // JobTrackedVersions is the number of historic Job versions that are kept. JobTrackedVersions int + // JobMaxCount is the maximum total task group counts for a single Job. + JobMaxCount int + Reporting *config.ReportingConfig // OIDCIssuer is the URL for the OIDC Issuer field in Workload Identity JWTs. @@ -664,6 +667,7 @@ func DefaultConfig() *Config { DeploymentQueryRateLimit: deploymentwatcher.LimitStateQueriesPerSecond, JobDefaultPriority: structs.JobDefaultPriority, JobMaxPriority: structs.JobDefaultMaxPriority, + JobMaxCount: structs.JobDefaultMaxCount, JobTrackedVersions: structs.JobDefaultTrackedVersions, StartTimeout: 30 * time.Second, NodeIntroductionConfig: structs.DefaultNodeIntroductionConfig(), diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index c9a710722..9715fbec0 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1040,8 +1040,19 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes } } + // Ensure that JobMaxCount is respected. + newCount := int(*args.Count) + totalCount := 0 + for _, tg := range job.TaskGroups { + totalCount += tg.Count + } + totalCount = totalCount - group.Count + newCount + if j.srv.config.JobMaxCount > 0 && totalCount > j.srv.config.JobMaxCount { + return fmt.Errorf("total count was greater than configured job_max_count: %d > %d", totalCount, j.srv.config.JobMaxCount) + } + // Update group count - group.Count = int(*args.Count) + group.Count = newCount job.SubmitTime = now // Block scaling event if there's an active deployment diff --git a/nomad/job_endpoint_hooks.go b/nomad/job_endpoint_hooks.go index b1788dc2e..09841141b 100644 --- a/nomad/job_endpoint_hooks.go +++ b/nomad/job_endpoint_hooks.go @@ -521,7 +521,10 @@ func (v *jobValidate) Validate(job *structs.Job) (warnings []error, err error) { okForIdentity := v.isEligibleForMultiIdentity() + totalCount := 0 for _, tg := range job.TaskGroups { + totalCount += tg.Count + for _, s := range tg.Services { serviceErrs := v.validateServiceIdentity( s, fmt.Sprintf("task group %s", tg.Name), okForIdentity) @@ -543,6 +546,10 @@ func (v *jobValidate) Validate(job *structs.Job) (warnings []error, err error) { warnings = append(warnings, vaultWarns...) } } + if v.srv.config.JobMaxCount > 0 && totalCount > v.srv.config.JobMaxCount { + err := fmt.Errorf("total count was greater than configured job_max_count: %d > %d", totalCount, v.srv.config.JobMaxCount) + multierror.Append(validationErrors, err) + } return warnings, validationErrors.ErrorOrNil() } diff --git a/nomad/job_endpoint_hooks_test.go b/nomad/job_endpoint_hooks_test.go index 2e4f8417c..a46f255f8 100644 --- a/nomad/job_endpoint_hooks_test.go +++ b/nomad/job_endpoint_hooks_test.go @@ -16,6 +16,34 @@ import ( "github.com/shoenig/test/must" ) +func Test_jobValidate_Validate(t *testing.T) { + ci.Parallel(t) + + t.Run("error if task group count exceeds job_max_count", func(t *testing.T) { + impl := jobValidate{srv: &Server{config: &Config{JobMaxCount: 10, JobMaxPriority: 100}}} + job := mock.Job() + job.TaskGroups[0].Count = 11 + _, err := impl.Validate(job) + must.ErrorContains(t, err, "total count was greater than configured job_max_count: 11 > 10") + }) + + t.Run("no error if task group count equals job_max_count", func(t *testing.T) { + impl := jobValidate{srv: &Server{config: &Config{JobMaxCount: 10, JobMaxPriority: 100}}} + job := mock.Job() + job.TaskGroups[0].Count = 10 + _, err := impl.Validate(job) + must.NoError(t, err) + }) + + t.Run("no error if job_max_count is zero (i.e. unlimited)", func(t *testing.T) { + impl := jobValidate{srv: &Server{config: &Config{JobMaxCount: 0, JobMaxPriority: 100}}} + job := mock.Job() + job.TaskGroups[0].Count = structs.JobDefaultMaxCount + 1 + _, err := impl.Validate(job) + must.NoError(t, err) + }) +} + func Test_jobValidate_Validate_consul_service(t *testing.T) { ci.Parallel(t) diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 9ff8d11b4..921060b11 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -7683,7 +7683,7 @@ func TestJobEndpoint_Scale_Invalid(t *testing.T) { require.Contains(err.Error(), "should not contain count if error is true") } -func TestJobEndpoint_Scale_OutOfBounds(t *testing.T) { +func TestJobEndpoint_Scale_TaskGroupOutOfBounds(t *testing.T) { ci.Parallel(t) require := require.New(t) @@ -7726,6 +7726,44 @@ func TestJobEndpoint_Scale_OutOfBounds(t *testing.T) { require.Contains(err.Error(), "group count was less than scaling policy minimum: 2 < 3") } +func TestJobEndpoint_Scale_JobOutOfBounds(t *testing.T) { + ci.Parallel(t) + + s1, cleanupS1 := TestServer(t, func(config *Config) { + config.JobMaxCount = 4 + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + state := s1.fsm.State() + + const requestedCount = 6 + job := mock.Job() + job.TaskGroups[0].Count = requestedCount + + // register the job + err := state.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job) + must.NoError(t, err) + + var resp structs.JobRegisterResponse + scale := &structs.JobScaleRequest{ + JobID: job.ID, + Target: map[string]string{ + structs.ScalingTargetGroup: job.TaskGroups[0].Name, + }, + Count: pointer.Of(int64(requestedCount)), + Message: "count too high", + PolicyOverride: false, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp) + must.Error(t, err) + must.ErrorContains(t, err, "total count was greater than configured job_max_count: 6 > 4") +} + func TestJobEndpoint_Scale_NoEval(t *testing.T) { ci.Parallel(t) require := require.New(t) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 71b87541e..d8cbf921c 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -4327,6 +4327,9 @@ const ( // JobMaxPriority is the maximum allowed configuration value for maximum job priority JobMaxPriority = math.MaxInt16 - 1 + // JobDefaultMaxCount is the default maximum total task group counts per job + JobDefaultMaxCount = 50000 + // CoreJobPriority should be higher than any user // specified job so that it gets priority. This is important // for the system to remain healthy. diff --git a/website/content/docs/configuration/server.mdx b/website/content/docs/configuration/server.mdx index a09f7598d..a1af96ddb 100644 --- a/website/content/docs/configuration/server.mdx +++ b/website/content/docs/configuration/server.mdx @@ -289,6 +289,14 @@ server { - `job_default_priority` `(int: 50)` - Specifies the default priority assigned to a job. A valid value must be between `50` and `job_max_priority`. +- `job_max_count` `(int: 50000)` - Specifies the maximum number of allocations + for a job, as represented by the sum of its task group `count` fields. Jobs + of type `system` ignore this value. The child jobs of dispatched batch jobs + or periodic jobs are counted separately from their parent job. This value + must be non-negative. If set to 0, no limit is enforced. This value is enforced + at the time the job is submitted or scaled, and updating the value will not + impact existing jobs. + - `job_max_source_size` `(string: "1M")` - Specifies the size limit of the associated job source content when registering a job. Note this is not a limit on the actual size of a job. If the limit is exceeded, the original source is simply discarded diff --git a/website/content/docs/upgrade/upgrade-specific.mdx b/website/content/docs/upgrade/upgrade-specific.mdx index da815a4e5..21717451c 100644 --- a/website/content/docs/upgrade/upgrade-specific.mdx +++ b/website/content/docs/upgrade/upgrade-specific.mdx @@ -38,6 +38,15 @@ being silently ignored. Any existing policies with duplicate or invalid keys will continue to work, but the source policy document will need to be updated to be valid before it can be written to Nomad. +#### Maximum number of allocations per job is limited by default + +Nomad 1.11.0 limits the maximum number of allocations for a job to the value of +the new [`job_max_count`](/nomad/docs/configuration/server#job_max_count) server +configuration option, which defaults to 50000. The number of allocations is +determined from the sum of the job's task group `count` fields. This limit is +enforced at the time the job is submitted or scaled, and updating the value will +not impact existing jobs. + ## Nomad 1.10.6 #### ACL policies no longer silently ignore duplicate or invalid keys