From e40164abce43f83a7e95e9a1bf71a13a8193044f Mon Sep 17 00:00:00 2001 From: Allison Larson Date: Thu, 2 Oct 2025 13:56:59 -0700 Subject: [PATCH] Add preserve-resources flag (#26841) * Add preserve-resources flag when registering a job * Add preserve-resources flag to website docs * Add changelog * Update tests, docs * Preserve counts & resources in fsm * Update doc * Update preservation of resources/count to happen in StateStore --- .changelog/26841.txt | 3 + api/jobs.go | 23 +++--- api/jobs_test.go | 112 ++++++++++++++++++++++++++ command/agent/job_endpoint.go | 13 +-- command/job_run.go | 44 +++++----- nomad/fsm.go | 2 +- nomad/job_endpoint.go | 13 --- nomad/job_endpoint_test.go | 69 ++++++++++++++++ nomad/state/deployment_events_test.go | 2 +- nomad/state/events_test.go | 6 +- nomad/state/state_store.go | 68 ++++++++++++++-- nomad/state/state_store_test.go | 78 ++++++++++++++++++ nomad/structs/structs.go | 5 ++ website/content/api-docs/jobs.mdx | 3 + website/content/commands/job/run.mdx | 7 +- 15 files changed, 387 insertions(+), 61 deletions(-) create mode 100644 .changelog/26841.txt diff --git a/.changelog/26841.txt b/.changelog/26841.txt new file mode 100644 index 000000000..0347d2dce --- /dev/null +++ b/.changelog/26841.txt @@ -0,0 +1,3 @@ +```release-note:improvement +cli: Add -preserve-resources flag for keeping resource block when updating jobs +``` diff --git a/api/jobs.go b/api/jobs.go index 59dc0ea9a..545519acf 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -120,12 +120,13 @@ func (j *Jobs) Validate(job *Job, q *WriteOptions) (*JobValidateResponse, *Write // RegisterOptions is used to pass through job registration parameters type RegisterOptions struct { - EnforceIndex bool - ModifyIndex uint64 - PolicyOverride bool - PreserveCounts bool - EvalPriority int - Submission *JobSubmission + EnforceIndex bool + ModifyIndex uint64 + PolicyOverride bool + PreserveCounts bool + PreserveResources bool + EvalPriority int + Submission *JobSubmission } // Register is used to register a new job. It returns the ID @@ -152,6 +153,7 @@ func (j *Jobs) RegisterOpts(job *Job, opts *RegisterOptions, q *WriteOptions) (* } req.PolicyOverride = opts.PolicyOverride req.PreserveCounts = opts.PreserveCounts + req.PreserveResources = opts.PreserveResources req.EvalPriority = opts.EvalPriority req.Submission = opts.Submission } @@ -1486,10 +1488,11 @@ type JobRegisterRequest struct { // If EnforceIndex is set then the job will only be registered if the passed // JobModifyIndex matches the current Jobs index. If the index is zero, the // register only occurs if the job is new. - EnforceIndex bool `json:",omitempty"` - JobModifyIndex uint64 `json:",omitempty"` - PolicyOverride bool `json:",omitempty"` - PreserveCounts bool `json:",omitempty"` + EnforceIndex bool `json:",omitempty"` + JobModifyIndex uint64 `json:",omitempty"` + PolicyOverride bool `json:",omitempty"` + PreserveCounts bool `json:",omitempty"` + PreserveResources bool `json:",omitempty"` // EvalPriority is an optional priority to use on any evaluation created as // a result on this job registration. This value must be between 1-100 diff --git a/api/jobs_test.go b/api/jobs_test.go index ef2e4620c..488e3e708 100644 --- a/api/jobs_test.go +++ b/api/jobs_test.go @@ -186,6 +186,118 @@ func TestJobs_Register_NoPreserveCounts(t *testing.T) { must.Eq(t, 3, status.TaskGroups["group3"].Desired) // new => as specified } +func TestJobs_Register_PreserveResources(t *testing.T) { + testutil.Parallel(t) + + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + // Listing jobs before registering returns nothing + resp, _, err := jobs.List(nil) + must.NoError(t, err) + must.SliceEmpty(t, resp) + + // Create a job + task := NewTask("task", "exec"). + SetConfig("command", "/bin/echo"). + SetLogConfig(&LogConfig{ + MaxFiles: pointerOf(1), + MaxFileSizeMB: pointerOf(2), + }) + + group1 := NewTaskGroup("group1", 1). + AddTask(task). + RequireDisk(&EphemeralDisk{ + SizeMB: pointerOf(25), + }) + + job := NewBatchJob("job", "redis", "global", 1). + AddDatacenter("dc1"). + AddTaskGroup(group1) + + // Create a job and register it + resp2, wm, err := jobs.Register(job, nil) + must.NoError(t, err) + must.NotNil(t, resp2) + must.UUIDv4(t, resp2.EvalID) + assertWriteMeta(t, wm) + + // Update the job, new groups to test PreserveCounts + task.Resources = &Resources{ + CPU: pointerOf(50), + MemoryMB: pointerOf(128), + } + + // Update the job, with PreserveResources = true + _, _, err = jobs.RegisterOpts(job, &RegisterOptions{ + PreserveResources: true, + }, nil) + must.NoError(t, err) + + // Query the job scale status + registered, _, err := jobs.Info(*job.ID, nil) + must.NoError(t, err) + must.Eq(t, 100, *registered.TaskGroups[0].Tasks[0].Resources.CPU) // preserved + must.Eq(t, 300, *registered.TaskGroups[0].Tasks[0].Resources.MemoryMB) // preserved +} + +func TestJobs_Register_NoPreserveResources(t *testing.T) { + testutil.Parallel(t) + + c, s := makeClient(t, nil, nil) + defer s.Stop() + jobs := c.Jobs() + + // Listing jobs before registering returns nothing + resp, _, err := jobs.List(nil) + must.NoError(t, err) + must.SliceEmpty(t, resp) + + // Create a job + task := NewTask("task", "exec"). + SetConfig("command", "/bin/echo"). + SetLogConfig(&LogConfig{ + MaxFiles: pointerOf(1), + MaxFileSizeMB: pointerOf(2), + }) + + group1 := NewTaskGroup("group1", 1). + AddTask(task). + RequireDisk(&EphemeralDisk{ + SizeMB: pointerOf(25), + }) + + job := NewBatchJob("job", "redis", "global", 1). + AddDatacenter("dc1"). + AddTaskGroup(group1) + + // Create a job and register it + resp2, wm, err := jobs.Register(job, nil) + must.NoError(t, err) + must.NotNil(t, resp2) + must.UUIDv4(t, resp2.EvalID) + assertWriteMeta(t, wm) + + // Update the job, new groups to test PreserveCounts + task.Resources = &Resources{ + CPU: pointerOf(50), + MemoryMB: pointerOf(128), + } + + // Update the job, with PreserveResources = true + _, _, err = jobs.RegisterOpts(job, &RegisterOptions{ + PreserveResources: false, + }, nil) + must.NoError(t, err) + + // Query the job scale status + registered, _, err := jobs.Info(*job.ID, nil) + must.NoError(t, err) + must.Eq(t, 50, *registered.TaskGroups[0].Tasks[0].Resources.CPU) // updated + must.Eq(t, 128, *registered.TaskGroups[0].Tasks[0].Resources.MemoryMB) // updated +} + func TestJobs_Register_EvalPriority(t *testing.T) { testutil.Parallel(t) diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index b31ea5e21..df055515e 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -603,12 +603,13 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request, jobI Job: sJob, Submission: submission, - EnforceIndex: args.EnforceIndex, - JobModifyIndex: args.JobModifyIndex, - PolicyOverride: args.PolicyOverride, - PreserveCounts: args.PreserveCounts, - EvalPriority: args.EvalPriority, - WriteRequest: *writeReq, + EnforceIndex: args.EnforceIndex, + JobModifyIndex: args.JobModifyIndex, + PolicyOverride: args.PolicyOverride, + PreserveCounts: args.PreserveCounts, + PreserveResources: args.PreserveResources, + EvalPriority: args.EvalPriority, + WriteRequest: *writeReq, } var out structs.JobRegisterResponse diff --git a/command/job_run.go b/command/job_run.go index 21a3b1623..aceb22ed8 100644 --- a/command/job_run.go +++ b/command/job_run.go @@ -106,6 +106,9 @@ Run Options: -preserve-counts If set, the existing task group counts will be preserved when updating a job. + -preserve-resources + If set, the existing task resources will be preserved when updating a job. + -consul-namespace (Enterprise only) If set, any services in the job will be registered into the specified Consul namespace. Any template block reading from Consul KV @@ -137,20 +140,21 @@ func (c *JobRunCommand) Synopsis() string { func (c *JobRunCommand) AutocompleteFlags() complete.Flags { return mergeAutocompleteFlags(c.Meta.AutocompleteFlags(FlagSetClient), complete.Flags{ - "-check-index": complete.PredictNothing, - "-detach": complete.PredictNothing, - "-verbose": complete.PredictNothing, - "-consul-namespace": complete.PredictAnything, - "-vault-namespace": complete.PredictAnything, - "-output": complete.PredictNothing, - "-policy-override": complete.PredictNothing, - "-preserve-counts": complete.PredictNothing, - "-json": complete.PredictNothing, - "-hcl2-strict": complete.PredictNothing, - "-var": complete.PredictAnything, - "-var-file": complete.PredictFiles("*.var"), - "-eval-priority": complete.PredictNothing, - "-ui": complete.PredictNothing, + "-check-index": complete.PredictNothing, + "-detach": complete.PredictNothing, + "-verbose": complete.PredictNothing, + "-consul-namespace": complete.PredictAnything, + "-vault-namespace": complete.PredictAnything, + "-output": complete.PredictNothing, + "-policy-override": complete.PredictNothing, + "-preserve-counts": complete.PredictNothing, + "-preserve-resources": complete.PredictNothing, + "-json": complete.PredictNothing, + "-hcl2-strict": complete.PredictNothing, + "-var": complete.PredictAnything, + "-var-file": complete.PredictFiles("*.var"), + "-eval-priority": complete.PredictNothing, + "-ui": complete.PredictNothing, }) } @@ -165,7 +169,7 @@ func (c *JobRunCommand) AutocompleteArgs() complete.Predictor { func (c *JobRunCommand) Name() string { return "job run" } func (c *JobRunCommand) Run(args []string) int { - var detach, verbose, output, override, preserveCounts, openURL bool + var detach, verbose, output, override, preserveCounts, preserveResources, openURL bool var checkIndexStr, consulNamespace, vaultNamespace string var evalPriority int @@ -176,6 +180,7 @@ func (c *JobRunCommand) Run(args []string) int { flagSet.BoolVar(&output, "output", false, "") flagSet.BoolVar(&override, "policy-override", false, "") flagSet.BoolVar(&preserveCounts, "preserve-counts", false, "") + flagSet.BoolVar(&preserveResources, "preserve-resources", false, "") flagSet.BoolVar(&c.JobGetter.JSON, "json", false, "") flagSet.BoolVar(&c.JobGetter.Strict, "hcl2-strict", true, "") flagSet.StringVar(&checkIndexStr, "check-index", "", "") @@ -272,10 +277,11 @@ func (c *JobRunCommand) Run(args []string) int { // Set the register options opts := &api.RegisterOptions{ - PolicyOverride: override, - PreserveCounts: preserveCounts, - EvalPriority: evalPriority, - Submission: sub, + PolicyOverride: override, + PreserveCounts: preserveCounts, + PreserveResources: preserveResources, + EvalPriority: evalPriority, + Submission: sub, } if enforce { opts.EnforceIndex = true diff --git a/nomad/fsm.go b/nomad/fsm.go index 89b813c98..dc80b544d 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -634,7 +634,7 @@ func (n *nomadFSM) applyUpsertJob(msgType structs.MessageType, buf []byte, index */ req.Job.Canonicalize() - if err := n.state.UpsertJob(msgType, index, req.Submission, req.Job); err != nil { + if err := n.state.UpsertJobWithRequest(msgType, index, &req); err != nil { n.logger.Error("UpsertJob failed", "error", err) return err } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index fc47a3e6e..c9a710722 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -286,19 +286,6 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } } - // Preserve the existing task group counts, if so requested - if existingJob != nil && args.PreserveCounts { - prevCounts := make(map[string]int) - for _, tg := range existingJob.TaskGroups { - prevCounts[tg.Name] = tg.Count - } - for _, tg := range args.Job.TaskGroups { - if count, ok := prevCounts[tg.Name]; ok { - tg.Count = count - } - } - } - // Submit a multiregion job to other regions (enterprise only). // The job will have its region interpolated. var newVersion uint64 diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index ec0404d3d..9ff8d11b4 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -356,6 +356,75 @@ func TestJobEndpoint_Register_PreserveCounts(t *testing.T) { require.Equal(2, out.TaskGroups[1].Count) // should be as in job spec } +func TestJobEndpoint_Register_PreserveResources(t *testing.T) { + ci.Parallel(t) + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + job := mock.Job() + job.TaskGroups[0].Name = "group1" + job.TaskGroups = append(job.TaskGroups, job.TaskGroups[0].Copy()) + job.TaskGroups[1].Name = "group2" + job.TaskGroups[1].Tasks[0].Resources = &structs.Resources{ + CPU: 300, + MemoryMB: 128, + } + job.Canonicalize() + + // Register the job + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + }, &structs.JobRegisterResponse{})) + + // Check the job in the FSM state + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + must.NoError(t, err) + must.NotNil(t, out) + must.Eq(t, 10, out.TaskGroups[0].Count) + + // New version: + job = job.Copy() + task := job.TaskGroups[0].Tasks[0] + task.Resources.CPU = 200 + task.Resources.MemoryMB = 400 + + job.TaskGroups[1].Tasks[0].Resources = &structs.Resources{ + CPU: 250, + MemoryMB: 64, + } + + // Perform the update + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", &structs.JobRegisterRequest{ + Job: job, + PreserveResources: true, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + }, &structs.JobRegisterResponse{})) + + // Check the job in the FSM state + out, err = state.JobByID(nil, job.Namespace, job.ID) + must.NoError(t, err) + must.NotNil(t, out) + must.Eq(t, 500, out.TaskGroups[0].Tasks[0].Resources.CPU) // should not change + must.Eq(t, 256, out.TaskGroups[0].Tasks[0].Resources.MemoryMB) // should be as in job spec + + must.Eq(t, 300, out.TaskGroups[1].Tasks[0].Resources.CPU) // should not change + must.Eq(t, 128, out.TaskGroups[1].Tasks[0].Resources.MemoryMB) // should be as in job spec +} + func TestJobEndpoint_Register_EvalPriority(t *testing.T) { ci.Parallel(t) requireAssert := require.New(t) diff --git a/nomad/state/deployment_events_test.go b/nomad/state/deployment_events_test.go index 0dfec2b15..318a60a8f 100644 --- a/nomad/state/deployment_events_test.go +++ b/nomad/state/deployment_events_test.go @@ -30,7 +30,7 @@ func TestDeploymentEventFromChanges(t *testing.T) { d := mock.Deployment() d.JobID = j.ID - must.NoError(t, s.upsertJobImpl(10, nil, j, false, setupTx)) + must.NoError(t, s.upsertJobImpl(10, nil, j, false, setupTx, nil)) must.NoError(t, s.upsertDeploymentImpl(10, d, setupTx)) setupTx.Txn.Commit() diff --git a/nomad/state/events_test.go b/nomad/state/events_test.go index 3ce793ca0..7559dff31 100644 --- a/nomad/state/events_test.go +++ b/nomad/state/events_test.go @@ -113,7 +113,7 @@ func TestEventsFromChanges_DeploymentUpdate(t *testing.T) { d := mock.Deployment() d.JobID = j.ID - must.NoError(t, s.upsertJobImpl(10, nil, j, false, setupTx)) + must.NoError(t, s.upsertJobImpl(10, nil, j, false, setupTx, nil)) must.NoError(t, s.upsertDeploymentImpl(10, d, setupTx)) setupTx.Txn.Commit() @@ -157,7 +157,7 @@ func TestEventsFromChanges_DeploymentPromotion(t *testing.T) { tg2 := tg1.Copy() tg2.Name = "foo" j.TaskGroups = append(j.TaskGroups, tg2) - must.NoError(t, s.upsertJobImpl(10, nil, j, false, setupTx)) + must.NoError(t, s.upsertJobImpl(10, nil, j, false, setupTx, nil)) d := mock.Deployment() d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion @@ -234,7 +234,7 @@ func TestEventsFromChanges_DeploymentAllocHealthRequestType(t *testing.T) { tg2 := tg1.Copy() tg2.Name = "foo" j.TaskGroups = append(j.TaskGroups, tg2) - must.NoError(t, s.upsertJobImpl(10, nil, j, false, setupTx)) + must.NoError(t, s.upsertJobImpl(10, nil, j, false, setupTx, nil)) d := mock.Deployment() d.StatusDescription = structs.DeploymentStatusDescriptionRunningNeedsPromotion diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index d59e63493..d3d2df609 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1750,7 +1750,7 @@ func (s *StateStore) Nodes(ws memdb.WatchSet) (memdb.ResultIterator, error) { func (s *StateStore) UpsertJob(msgType structs.MessageType, index uint64, sub *structs.JobSubmission, job *structs.Job) error { txn := s.db.WriteTxnMsgT(msgType, index) defer txn.Abort() - if err := s.upsertJobImpl(index, sub, job, false, txn); err != nil { + if err := s.upsertJobImpl(index, sub, job, false, txn, nil); err != nil { return err } return txn.Commit() @@ -1759,11 +1759,23 @@ func (s *StateStore) UpsertJob(msgType structs.MessageType, index uint64, sub *s // UpsertJobTxn is used to register a job or update a job definition, like UpsertJob, // but in a transaction. Useful for when making multiple modifications atomically func (s *StateStore) UpsertJobTxn(index uint64, sub *structs.JobSubmission, job *structs.Job, txn Txn) error { - return s.upsertJobImpl(index, sub, job, false, txn) + return s.upsertJobImpl(index, sub, job, false, txn, nil) +} + +// UpsertJobWithRequest is used to register a job or update a job definition +// using the JobRegisterRequest. It allows flags to be set and used within the +// upsert job action +func (s *StateStore) UpsertJobWithRequest(msgType structs.MessageType, index uint64, req *structs.JobRegisterRequest) error { + txn := s.db.WriteTxnMsgT(msgType, index) + defer txn.Abort() + if err := s.upsertJobImpl(index, req.Submission, req.Job, false, txn, req); err != nil { + return err + } + return txn.Commit() } // upsertJobImpl is the implementation for registering a job or updating a job definition -func (s *StateStore) upsertJobImpl(index uint64, sub *structs.JobSubmission, job *structs.Job, keepVersion bool, txn *txn) error { +func (s *StateStore) upsertJobImpl(index uint64, sub *structs.JobSubmission, job *structs.Job, keepVersion bool, txn *txn, req *structs.JobRegisterRequest) error { // Assert the namespace exists if exists, err := s.namespaceExists(txn, job.Namespace); err != nil { return err @@ -1860,6 +1872,10 @@ func (s *StateStore) upsertJobImpl(index uint64, sub *structs.JobSubmission, job return fmt.Errorf("unable to update job submission: %v", err) } + if err := s.updatePreservedValues(job, existingJob, req); err != nil { + return fmt.Errorf("unable to update preserved values: %v", err) + } + // Insert the job if err := txn.Insert("jobs", job); err != nil { return fmt.Errorf("job insert failed: %v", err) @@ -4707,7 +4723,7 @@ func (s *StateStore) UpdateDeploymentStatus(msgType structs.MessageType, index u // On failed deployments with auto_revert set to true, a new eval and job will be included on the request. // We should upsert them both if req.Job != nil { - if err := s.upsertJobImpl(index, nil, req.Job, false, txn); err != nil { + if err := s.upsertJobImpl(index, nil, req.Job, false, txn, nil); err != nil { return err } } @@ -4793,7 +4809,7 @@ func (s *StateStore) updateJobStabilityImpl(index uint64, namespace, jobID strin copy := job.Copy() copy.Stable = stable - return s.upsertJobImpl(index, nil, copy, true, txn) + return s.upsertJobImpl(index, nil, copy, true, txn, nil) } func (s *StateStore) UpdateJobVersionTag(index uint64, namespace string, req *structs.JobApplyTagRequest) error { @@ -5115,7 +5131,7 @@ func (s *StateStore) UpdateDeploymentAllocHealth(msgType structs.MessageType, in // Upsert the job if necessary if req.Job != nil { - if err := s.upsertJobImpl(index, nil, req.Job, false, txn); err != nil { + if err := s.upsertJobImpl(index, nil, req.Job, false, txn, nil); err != nil { return err } } @@ -5592,6 +5608,46 @@ func (s *StateStore) updateSummaryWithJob(index uint64, job *structs.Job, return nil } +// updatePreservedValues preserves the existing task group counts and resources, +// if requested. This avoids race conditions when registering and scaling jobs. +func (s *StateStore) updatePreservedValues(job *structs.Job, prev *structs.Job, req *structs.JobRegisterRequest) error { + if req == nil || prev == nil { + return nil + } + if req.PreserveCounts || req.PreserveResources { + if req.PreserveCounts { + prevCounts := make(map[string]int) + for _, tg := range prev.TaskGroups { + prevCounts[tg.Name] = tg.Count + } + for _, tg := range job.TaskGroups { + if count, ok := prevCounts[tg.Name]; ok { + tg.Count = count + } + } + } + + if req.PreserveResources { + prevResources := make(map[string]map[string]*structs.Resources) + for _, tg := range prev.TaskGroups { + prevResources[tg.Name] = make(map[string]*structs.Resources) + for _, task := range tg.Tasks { + prevResources[tg.Name][task.Name] = task.Resources + } + } + for _, tg := range job.TaskGroups { + for _, task := range tg.Tasks { + if res, ok := prevResources[tg.Name][task.Name]; ok { + task.Resources = res.Copy() + } + } + } + } + } + + return nil +} + // updateJobScalingPolicies upserts any scaling policies contained in the job and removes // any previous scaling policies that were removed from the job func (s *StateStore) updateJobScalingPolicies(index uint64, job *structs.Job, txn *txn) error { diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index 492062077..189603269 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2535,6 +2535,84 @@ func TestStateStore_UpdateUpsertJob_JobVersion(t *testing.T) { must.False(t, watchFired(ws), must.Sprint("watch should not have fired")) } +func TestStateStore_UpsertJobWithRequest(t *testing.T) { + ci.Parallel(t) + + state := testStateStore(t) + job := mock.Job() + + // Create a watchset so we can test that upsert fires the watch + ws := memdb.NewWatchSet() + _, err := state.JobByID(ws, job.Namespace, job.ID) + must.NoError(t, err) + + must.NoError(t, state.UpsertJobWithRequest(structs.MsgTypeTestSetup, 1000, &structs.JobRegisterRequest{Job: job})) + must.True(t, watchFired(ws), must.Sprint("expected watch to fire")) + + ws = memdb.NewWatchSet() + out, err := state.JobByID(ws, job.Namespace, job.ID) + must.NoError(t, err) + must.Eq(t, job, out) + + index, err := state.Index("jobs") + must.NoError(t, err) + must.Eq(t, 1000, index) + + summary, err := state.JobSummaryByID(ws, job.Namespace, job.ID) + must.NoError(t, err) + must.NotNil(t, summary) + must.Eq(t, job.ID, summary.JobID, must.Sprint("bad summary id")) + _, ok := summary.Summary["web"] + must.True(t, ok, must.Sprint("nil summary for task group")) + must.False(t, watchFired(ws), must.Sprint("watch should not have fired")) + + // Check the job versions + allVersions, err := state.JobVersionsByID(ws, job.Namespace, job.ID) + must.NoError(t, err) + must.Len(t, 1, allVersions) + + a := allVersions[0] + must.Eq(t, a.ID, job.ID) + must.Eq(t, a.Version, 0) + + // Test the looking up the job by version returns the same results + vout, err := state.JobByIDAndVersion(ws, job.Namespace, job.ID, 0) + must.NoError(t, err) + must.Eq(t, out, vout) +} + +func TestStateStore_UpsertJobWithRequest_PreserveCount(t *testing.T) { + ci.Parallel(t) + + state := testStateStore(t) + + // Create a job + job := mock.Job() + job.TaskGroups[0].Count = 10 + job.TaskGroups[0].Tasks[0].Resources = &structs.Resources{ + CPU: 500, + MemoryMB: 256, + } + + must.NoError(t, state.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job)) + + job2 := job.Copy() + job2.TaskGroups[0].Count = 5 + job2.TaskGroups[0].Tasks[0].Resources = &structs.Resources{ + CPU: 750, + MemoryMB: 500, + } + + must.NoError(t, state.UpsertJobWithRequest(structs.MsgTypeTestSetup, 1001, &structs.JobRegisterRequest{PreserveCounts: true, PreserveResources: true, Job: job2})) + + out, err := state.JobByID(nil, job.Namespace, job.ID) + must.NoError(t, err) + + must.Eq(t, 10, out.TaskGroups[0].Count) + must.Eq(t, out.TaskGroups[0].Tasks[0].Resources.CPU, 500) + must.Eq(t, out.TaskGroups[0].Tasks[0].Resources.MemoryMB, 256) +} + func TestStateStore_DeleteJob_Job(t *testing.T) { ci.Parallel(t) diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 99bdb2bfd..71b87541e 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -737,6 +737,11 @@ type JobRegisterRequest struct { // PreserveCounts is ignored for newly created jobs. PreserveCounts bool + // PreserveResources indicates that during job update, existing task + // resources should be preserved, over those specified in the new job spec + // PreserveResources is ignored for newly created jobs. + PreserveResources bool + // PolicyOverride is set when the user is attempting to override any policies PolicyOverride bool diff --git a/website/content/api-docs/jobs.mdx b/website/content/api-docs/jobs.mdx index e36ae6e1d..391434c2f 100644 --- a/website/content/api-docs/jobs.mdx +++ b/website/content/api-docs/jobs.mdx @@ -145,6 +145,9 @@ The table below shows this endpoint's support for - `PreserveCounts` `(bool: false)` - If set, existing task group counts are preserved, over those specified in the new job spec. +- `PreserveResources` `(bool: false)` - If set, existing task resources are + preserved, over those specified in the new job spec. + ### Sample Payload ```json diff --git a/website/content/commands/job/run.mdx b/website/content/commands/job/run.mdx index d9654cb3b..099878fef 100644 --- a/website/content/commands/job/run.mdx +++ b/website/content/commands/job/run.mdx @@ -76,8 +76,11 @@ that volume. - `-policy-override`: Sets the flag to force override any soft mandatory Sentinel policies. -- `-preserve-counts`: If set, the existing task group counts will be preserved - when updating a job. +- `-preserve-counts`: If set, the existing task group counts are preserved when + updating a job. + +- `-preserve-resources`: If set, the existing task resources are preserved when + updating a job, so that autoscaling changes are not overwritten. - `-consul-namespace`: If set, any services in the job will be registered into the specified Consul namespace. Any `template` block reading from Consul KV will