diff --git a/.changelog/26710.txt b/.changelog/26710.txt new file mode 100644 index 000000000..8348a010a --- /dev/null +++ b/.changelog/26710.txt @@ -0,0 +1,3 @@ +```release-note:bug +dispatch: Fixed a bug where evaluations were not created atomically with dispatched jobs, which could prevent dispatch jobs from creating allocations +``` diff --git a/nomad/fsm.go b/nomad/fsm.go index 2b8817b77..c760b5caf 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -734,8 +734,8 @@ func (n *nomadFSM) applyUpsertJob(msgType structs.MessageType, buf []byte, index } } - // COMPAT: Prior to Nomad 0.12.x evaluations were submitted in a separate Raft log, - // so this may be nil during server upgrades. + // Not all job registrations will include an eval (ex. registering a + // dispatch/periodic job) if req.Eval != nil { req.Eval.JobModifyIndex = index diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index e69685935..fc47a3e6e 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -2107,12 +2107,29 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa // Compress the payload dispatchJob.Payload = snappy.Encode(nil, args.Payload) + // If the job is periodic, we don't create an eval. + var eval *structs.Evaluation + if !dispatchJob.IsPeriodic() { + now := time.Now().UnixNano() + eval = &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: args.RequestNamespace(), + Priority: dispatchJob.Priority, + Type: dispatchJob.Type, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: dispatchJob.ID, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + } + regReq := &structs.JobRegisterRequest{ Job: dispatchJob, WriteRequest: args.WriteRequest, + Eval: eval, } - // Commit this update via Raft _, jobCreateIndex, err := j.srv.raftApply(structs.JobRegisterRequestType, regReq) if err != nil { j.logger.Error("dispatched job register failed", "error", err) @@ -2123,38 +2140,9 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa reply.DispatchedJobID = dispatchJob.ID reply.Index = jobCreateIndex - // If the job is periodic, we don't create an eval. - if !dispatchJob.IsPeriodic() { - // Create a new evaluation - now := time.Now().UnixNano() - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: args.RequestNamespace(), - Priority: dispatchJob.Priority, - Type: dispatchJob.Type, - TriggeredBy: structs.EvalTriggerJobRegister, - JobID: dispatchJob.ID, - JobModifyIndex: jobCreateIndex, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, - WriteRequest: structs.WriteRequest{Region: args.Region}, - } - - // Commit this evaluation via Raft - _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - j.logger.Error("eval create failed", "error", err, "method", "dispatch") - return err - } - - // Setup the reply + if eval != nil { reply.EvalID = eval.ID - reply.EvalCreateIndex = evalIndex - reply.Index = evalIndex + reply.EvalCreateIndex = jobCreateIndex } return nil diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index cb3de1266..ec0404d3d 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -1385,7 +1385,6 @@ func TestJobEndpoint_Register_ParameterizedJob(t *testing.T) { func TestJobEndpoint_Register_Dispatched(t *testing.T) { ci.Parallel(t) - require := require.New(t) s1, cleanupS1 := TestServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -1409,8 +1408,7 @@ func TestJobEndpoint_Register_Dispatched(t *testing.T) { // Fetch the response var resp structs.JobRegisterResponse err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) - require.Error(err) - require.Contains(err.Error(), "job can't be submitted with 'Dispatched'") + must.EqError(t, err, "job can't be submitted with 'Dispatched' set") } func TestJobEndpoint_Register_EnforceIndex(t *testing.T) { @@ -6532,7 +6530,6 @@ func TestJobEndpoint_ValidateJob_PriorityNotOk(t *testing.T) { func TestJobEndpoint_Dispatch_ACL(t *testing.T) { ci.Parallel(t) - require := require.New(t) s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -6540,13 +6537,13 @@ func TestJobEndpoint_Dispatch_ACL(t *testing.T) { defer cleanupS1() codec := rpcClient(t, s1) testutil.WaitForLeader(t, s1.RPC) - state := s1.fsm.State() + store := s1.fsm.State() // Create a parameterized job job := mock.BatchJob() job.ParameterizedJob = &structs.ParameterizedJobConfig{} - err := state.UpsertJob(structs.MsgTypeTestSetup, 400, nil, job) - require.Nil(err) + err := store.UpsertJob(structs.MsgTypeTestSetup, 400, nil, job) + must.NoError(t, err) req := &structs.JobDispatchRequest{ JobID: job.ID, @@ -6559,52 +6556,49 @@ func TestJobEndpoint_Dispatch_ACL(t *testing.T) { // Attempt to fetch the response without a token should fail var resp structs.JobDispatchResponse err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", req, &resp) - require.NotNil(err) - require.Contains(err.Error(), "Permission denied") + must.EqError(t, err, "Permission denied") // Attempt to fetch the response with an invalid token should fail - invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", + invalidToken := mock.CreatePolicyAndToken(t, store, 1001, "test-invalid", mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityListJobs})) req.AuthToken = invalidToken.SecretID var invalidResp structs.JobDispatchResponse err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", req, &invalidResp) - require.NotNil(err) - require.Contains(err.Error(), "Permission denied") + must.EqError(t, err, "Permission denied") // Dispatch with a valid management token should succeed req.AuthToken = root.SecretID var validResp structs.JobDispatchResponse err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", req, &validResp) - require.Nil(err) - require.NotNil(validResp.EvalID) - require.NotNil(validResp.DispatchedJobID) - require.NotEqual(validResp.DispatchedJobID, "") + must.NoError(t, err) + must.NotEq(t, "", validResp.EvalID) + must.NotEq(t, "", validResp.DispatchedJobID) // Dispatch with a valid token should succeed - validToken := mock.CreatePolicyAndToken(t, state, 1003, "test-valid", + validToken := mock.CreatePolicyAndToken(t, store, 1003, "test-valid", mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityDispatchJob})) req.AuthToken = validToken.SecretID var validResp2 structs.JobDispatchResponse err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", req, &validResp2) - require.Nil(err) - require.NotNil(validResp2.EvalID) - require.NotNil(validResp2.DispatchedJobID) - require.NotEqual(validResp2.DispatchedJobID, "") + must.NoError(t, err) + must.NotEq(t, "", validResp2.EvalID) + must.NotEq(t, "", validResp2.DispatchedJobID) ws := memdb.NewWatchSet() - out, err := state.JobByID(ws, job.Namespace, validResp2.DispatchedJobID) - require.Nil(err) - require.NotNil(out) - require.Equal(out.ParentID, job.ID) + out, err := store.JobByID(ws, job.Namespace, validResp2.DispatchedJobID) + must.NoError(t, err) + must.NotNil(t, out) + must.Eq(t, job.ID, out.ParentID) - // Look up the evaluation - eval, err := state.EvalByID(ws, validResp2.EvalID) - require.Nil(err) - require.NotNil(eval) - require.Equal(eval.CreateIndex, validResp2.EvalCreateIndex) + // Evaluation should be created atomically with job + eval, err := store.EvalByID(ws, validResp2.EvalID) + must.NoError(t, err) + must.NotNil(t, eval) + must.Eq(t, validResp2.EvalCreateIndex, eval.CreateIndex) + must.Eq(t, validResp2.JobCreateIndex, eval.CreateIndex) } func TestJobEndpoint_Dispatch(t *testing.T) {