From 0e9eb5ae43fd4e12d69b5734029eb89790230460 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 5 Sep 2025 14:53:08 -0400 Subject: [PATCH] dispatch: write evaluation atomically with dispatch registration (#26710) In #8435 (shipped in 0.12.1), we updated the `Job.Register` RPC to atomically write the eval along with the job. But this didn't get copied to `Job.Dispatch`. Under excessive load testing we demonstrated this can result in dispatched jobs without corresponding evals. Update the dispatch RPC to write the eval in the same Raft log as the job registration. Note that we don't need to version-check this change for upgrades, because the register and dispatch RPCs share the same `JobRegisterRequestType` Raft message, and therefore all supported server versions already look for the eval in the FSM. If an updated leader includes the eval, older followers will write the eval. If a non-updated leader writes the eval in a separate Raft entry, updated followers will write those evals normally. Fixes: https://github.com/hashicorp/nomad/issues/26655 Ref: https://hashicorp.atlassian.net/browse/NMD-947 Ref: https://github.com/hashicorp/nomad/pull/8435 --- .changelog/26710.txt | 3 +++ nomad/fsm.go | 4 +-- nomad/job_endpoint.go | 52 ++++++++++++++---------------------- nomad/job_endpoint_test.go | 54 +++++++++++++++++--------------------- 4 files changed, 49 insertions(+), 64 deletions(-) create mode 100644 .changelog/26710.txt diff --git a/.changelog/26710.txt b/.changelog/26710.txt new file mode 100644 index 000000000..8348a010a --- /dev/null +++ b/.changelog/26710.txt @@ -0,0 +1,3 @@ +```release-note:bug +dispatch: Fixed a bug where evaluations were not created atomically with dispatched jobs, which could prevent dispatch jobs from creating allocations +``` diff --git a/nomad/fsm.go b/nomad/fsm.go index 2b8817b77..c760b5caf 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -734,8 +734,8 @@ func (n *nomadFSM) applyUpsertJob(msgType structs.MessageType, buf []byte, index } } - // COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log, - // so this may be nil during server upgrades. + // Not all job registrations will include an eval (ex. registering a + // dispatch/periodic job) if req.Eval != nil { req.Eval.JobModifyIndex = index diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index e69685935..fc47a3e6e 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -2107,12 +2107,29 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa // Compress the payload dispatchJob.Payload = snappy.Encode(nil, args.Payload) + // If the job is periodic, we don't create an eval. + var eval *structs.Evaluation + if !dispatchJob.IsPeriodic() { + now := time.Now().UnixNano() + eval = &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: args.RequestNamespace(), + Priority: dispatchJob.Priority, + Type: dispatchJob.Type, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: dispatchJob.ID, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + } + regReq := &structs.JobRegisterRequest{ Job: dispatchJob, WriteRequest: args.WriteRequest, + Eval: eval, } - // Commit this update via Raft _, jobCreateIndex, err := j.srv.raftApply(structs.JobRegisterRequestType, regReq) if err != nil { j.logger.Error("dispatched job register failed", "error", err) @@ -2123,38 +2140,9 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa reply.DispatchedJobID = dispatchJob.ID reply.Index = jobCreateIndex - // If the job is periodic, we don't create an eval. - if !dispatchJob.IsPeriodic() { - // Create a new evaluation - now := time.Now().UnixNano() - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: args.RequestNamespace(), - Priority: dispatchJob.Priority, - Type: dispatchJob.Type, - TriggeredBy: structs.EvalTriggerJobRegister, - JobID: dispatchJob.ID, - JobModifyIndex: jobCreateIndex, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, - WriteRequest: structs.WriteRequest{Region: args.Region}, - } - - // Commit this evaluation via Raft - _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - j.logger.Error("eval create failed", "error", err, "method", "dispatch") - return err - } - - // Setup the reply + if eval != nil { reply.EvalID = eval.ID - reply.EvalCreateIndex = evalIndex - reply.Index = evalIndex + reply.EvalCreateIndex = jobCreateIndex } return nil diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index cb3de1266..ec0404d3d 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -1385,7 +1385,6 @@ func TestJobEndpoint_Register_ParameterizedJob(t *testing.T) { func TestJobEndpoint_Register_Dispatched(t *testing.T) { ci.Parallel(t) - require := require.New(t) s1, cleanupS1 := TestServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -1409,8 +1408,7 @@ func TestJobEndpoint_Register_Dispatched(t *testing.T) { // Fetch the response var resp structs.JobRegisterResponse err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) - require.Error(err) - require.Contains(err.Error(), "job can't be submitted with 'Dispatched'") + must.EqError(t, err, "job can't be submitted with 'Dispatched' set") } func TestJobEndpoint_Register_EnforceIndex(t *testing.T) { @@ -6532,7 +6530,6 @@ func TestJobEndpoint_ValidateJob_PriorityNotOk(t *testing.T) { func TestJobEndpoint_Dispatch_ACL(t *testing.T) { ci.Parallel(t) - require := require.New(t) s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -6540,13 +6537,13 @@ func TestJobEndpoint_Dispatch_ACL(t *testing.T) { defer cleanupS1() codec := rpcClient(t, s1) testutil.WaitForLeader(t, s1.RPC) - state := s1.fsm.State() + store := s1.fsm.State() // Create a parameterized job job := mock.BatchJob() job.ParameterizedJob = &structs.ParameterizedJobConfig{} - err := state.UpsertJob(structs.MsgTypeTestSetup, 400, nil, job) - require.Nil(err) + err := store.UpsertJob(structs.MsgTypeTestSetup, 400, nil, job) + must.NoError(t, err) req := &structs.JobDispatchRequest{ JobID: job.ID, @@ -6559,52 +6556,49 @@ func TestJobEndpoint_Dispatch_ACL(t *testing.T) { // Attempt to fetch the response without a token should fail var resp structs.JobDispatchResponse err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", req, &resp) - require.NotNil(err) - require.Contains(err.Error(), "Permission denied") + must.EqError(t, err, "Permission denied") // Attempt to fetch the response with an invalid token should fail - invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", + invalidToken := mock.CreatePolicyAndToken(t, store, 1001, "test-invalid", mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityListJobs})) req.AuthToken = invalidToken.SecretID var invalidResp structs.JobDispatchResponse err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", req, &invalidResp) - require.NotNil(err) - require.Contains(err.Error(), "Permission denied") + must.EqError(t, err, "Permission denied") // Dispatch with a valid management token should succeed req.AuthToken = root.SecretID var validResp structs.JobDispatchResponse err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", req, &validResp) - require.Nil(err) - require.NotNil(validResp.EvalID) - require.NotNil(validResp.DispatchedJobID) - require.NotEqual(validResp.DispatchedJobID, "") + must.NoError(t, err) + must.NotEq(t, "", validResp.EvalID) + must.NotEq(t, "", validResp.DispatchedJobID) // Dispatch with a valid token should succeed - validToken := mock.CreatePolicyAndToken(t, state, 1003, "test-valid", + validToken := mock.CreatePolicyAndToken(t, store, 1003, "test-valid", mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityDispatchJob})) req.AuthToken = validToken.SecretID var validResp2 structs.JobDispatchResponse err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", req, &validResp2) - require.Nil(err) - require.NotNil(validResp2.EvalID) - require.NotNil(validResp2.DispatchedJobID) - require.NotEqual(validResp2.DispatchedJobID, "") + must.NoError(t, err) + must.NotEq(t, "", validResp2.EvalID) + must.NotEq(t, "", validResp2.DispatchedJobID) ws := memdb.NewWatchSet() - out, err := state.JobByID(ws, job.Namespace, validResp2.DispatchedJobID) - require.Nil(err) - require.NotNil(out) - require.Equal(out.ParentID, job.ID) + out, err := store.JobByID(ws, job.Namespace, validResp2.DispatchedJobID) + must.NoError(t, err) + must.NotNil(t, out) + must.Eq(t, job.ID, out.ParentID) - // Look up the evaluation - eval, err := state.EvalByID(ws, validResp2.EvalID) - require.Nil(err) - require.NotNil(eval) - require.Equal(eval.CreateIndex, validResp2.EvalCreateIndex) + // Evaluation should be created atomically with job + eval, err := store.EvalByID(ws, validResp2.EvalID) + must.NoError(t, err) + must.NotNil(t, eval) + must.Eq(t, validResp2.EvalCreateIndex, eval.CreateIndex) + must.Eq(t, validResp2.JobCreateIndex, eval.CreateIndex) } func TestJobEndpoint_Dispatch(t *testing.T) {