From 97c69ee9a7312aaa315fec5850fe0a58e13a238a Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Fri, 10 Jul 2020 13:31:55 -0400 Subject: [PATCH] Atomic eval insertion with job (de-)registration This fixes a bug where jobs may get "stuck" unprocessed that dispropotionately affect periodic jobs around leadership transitions. When registering a job, the job registration and the eval to process it get applied to raft as two separate transactions; if the job registration succeeds but eval application fails, the job may remain unprocessed. Operators may detect such failure, when submitting a job update and get a 500 error code, and they could retry; periodic jobs failures are more likely to go unnoticed, and no further periodic invocations will be processed until an operator force evaluation. This fixes the issue by ensuring that the job registration and eval application get persisted and processed atomically in the same raft log entry. Also, applies the same change to ensure atomicity in job deregistration. Backward Compatibility We must maintain compatibility in two scenarios: mixed clusters where a leader can handle atomic updates but followers cannot, and a recent cluster processes old log entries from legacy or mixed cluster mode. To handle this constraints: ensure that the leader continue to emit the Evaluation log entry until all servers have upgraded; also, when processing raft logs, the servers honor evaluations found in both spots, the Eval in job (de-)registration and the eval update entries. When an updated server sees mix-mode behavior where an eval is inserted into the raft log twice, it ignores the second instance. I made one compromise in consistency in the mixed-mode scenario: servers may disagree on the eval.CreateIndex value: the leader and updated servers will report the job registration index while old servers will report the index of the eval update log entry. This discripency doesn't seem to be material - it's the eval.JobModifyIndex that matters. --- nomad/fsm.go | 32 ++- nomad/job_endpoint.go | 162 ++++++------ nomad/job_endpoint_test.go | 494 +++++++++++++++++++++++++++++++++++++ nomad/leader.go | 2 + nomad/periodic.go | 65 ++--- nomad/state/state_store.go | 17 ++ nomad/structs/structs.go | 6 + 7 files changed, 677 insertions(+), 101 deletions(-) diff --git a/nomad/fsm.go b/nomad/fsm.go index 10ca959fb..28f12da5f 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -555,6 +555,13 @@ func (n *nomadFSM) applyUpsertJob(buf []byte, index uint64) interface{} { } } + if req.Eval != nil { + req.Eval.JobModifyIndex = index + if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil { + return err + } + } + return nil } @@ -565,14 +572,30 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } - return n.state.WithWriteTransaction(func(tx state.Txn) error { - if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx); err != nil { + err := n.state.WithWriteTransaction(func(tx state.Txn) error { + err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge, tx) + + if err != nil { n.logger.Error("deregistering job failed", "error", err) return err } return nil }) + + // always attempt upsert eval even if job deregister fail + if req.Eval != nil { + req.Eval.JobModifyIndex = index + if err := n.upsertEvals(index, []*structs.Evaluation{req.Eval}); err != nil { + return err + } + } + + if err != nil { + return err + } + + return nil } func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} { @@ -663,7 +686,10 @@ func (n *nomadFSM) applyUpdateEval(buf []byte, index uint64) interface{} { } func (n *nomadFSM) upsertEvals(index uint64, evals []*structs.Evaluation) error { - if err := n.state.UpsertEvals(index, evals); err != nil { + if err := n.state.UpsertEvals(index, evals); len(evals) == 1 && err == state.ErrDuplicateEval { + // the request is a duplicate, ignore processing it + return nil + } else if err != nil { n.logger.Error("UpsertEvals failed", "error", err) return err } diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index a1b5df3c8..424f2d51b 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -312,10 +312,31 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return err } + // Create a new evaluation + now := time.Now().UTC().UnixNano() + submittedEval := false + + // Set the submit time + args.Job.SubmitTime = time.Now().UTC().UnixNano() + + // If the job is periodic or parameterized, we don't create an eval. + if !(args.Job.IsPeriodic() || args.Job.IsParameterized()) { + args.Eval = &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: args.RequestNamespace(), + Priority: args.Job.Priority, + Type: args.Job.Type, + TriggeredBy: structs.EvalTriggerJobRegister, + JobID: args.Job.ID, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + reply.EvalID = args.Eval.ID + } + // Check if the job has changed at all if existingJob == nil || existingJob.SpecChanged(args.Job) { - // Set the submit time - args.Job.SetSubmitTime() // Commit this update via Raft fsmErr, index, err := j.srv.raftApply(structs.JobRegisterRequestType, args) @@ -328,8 +349,16 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis return err } + submittedEval = true + // Populate the reply with job information reply.JobModifyIndex = index + reply.Index = index + + if args.Eval != nil { + reply.EvalCreateIndex = index + } + } else { reply.JobModifyIndex = existingJob.JobModifyIndex } @@ -337,43 +366,34 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis // used for multiregion start args.Job.JobModifyIndex = reply.JobModifyIndex - // If the job is periodic or parameterized, we don't create an eval. - if args.Job.IsPeriodic() || args.Job.IsParameterized() { + if args.Eval == nil { return nil } - // Create a new evaluation - now := time.Now().UTC().UnixNano() - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: args.RequestNamespace(), - Priority: args.Job.Priority, - Type: args.Job.Type, - TriggeredBy: structs.EvalTriggerJobRegister, - JobID: args.Job.ID, - JobModifyIndex: reply.JobModifyIndex, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, - WriteRequest: structs.WriteRequest{Region: args.Region}, - } + // COMPAT(1.1.0): Remove the ServerMeetMinimumVersion check. + // 0.12.1 introduced atomic eval job registration + if args.Eval != nil && + !(submittedEval && ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false)) { + args.Eval.JobModifyIndex = reply.JobModifyIndex + update := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{args.Eval}, + WriteRequest: structs.WriteRequest{Region: args.Region}, + } - // Commit this evaluation via Raft - // XXX: There is a risk of partial failure where the JobRegister succeeds - // but that the EvalUpdate does not. - _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - j.logger.Error("eval create failed", "error", err, "method", "register") - return err - } + // Commit this evaluation via Raft + // There is a risk of partial failure where the JobRegister succeeds + // but that the EvalUpdate does not, before 0.12.1 + _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + j.logger.Error("eval create failed", "error", err, "method", "register") + return err + } - // Populate the reply with eval information - reply.EvalID = eval.ID - reply.EvalCreateIndex = evalIndex - reply.Index = evalIndex + if !submittedEval { + reply.EvalCreateIndex = evalIndex + reply.Index = evalIndex + } + } // Kick off a multiregion deployment (enterprise only). if isRunner { @@ -766,6 +786,25 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD } } + // The job priority / type is strange for this, since it's not a high + // priority even if the job was. + now := time.Now().UTC().UnixNano() + // If the job is periodic or parameterized, we don't create an eval. + if job == nil || !(job.IsPeriodic() || job.IsParameterized()) { + args.Eval = &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: args.RequestNamespace(), + Priority: structs.JobDefaultPriority, + Type: structs.JobTypeService, + TriggeredBy: structs.EvalTriggerJobDeregister, + JobID: args.JobID, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + reply.EvalID = args.Eval.ID + } + // Commit the job update via Raft _, index, err := j.srv.raftApply(structs.JobDeregisterRequestType, args) if err != nil { @@ -775,6 +814,8 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD // Populate the reply with job information reply.JobModifyIndex = index + reply.EvalCreateIndex = index + reply.Index = index // Make a raft apply to release the CSI volume claims of terminal allocs. var result *multierror.Error @@ -783,44 +824,25 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD result = multierror.Append(result, err) } - // If the job is periodic or parameterized, we don't create an eval. - if job != nil && (job.IsPeriodic() || job.IsParameterized()) { - return nil + // COMPAT(1.1.0) - 0.12.1 introduced atomic job deregistration eval + if args.Eval != nil && + !ServersMeetMinimumVersion(j.srv.Members(), minJobRegisterAtomicEvalVersion, false) { + // Create a new evaluation + args.Eval.JobModifyIndex = index + update := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{args.Eval}, + WriteRequest: structs.WriteRequest{Region: args.Region}, + } + + // Commit this evaluation via Raft + _, _, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + result = multierror.Append(result, err) + j.logger.Error("eval create failed", "error", err, "method", "deregister") + return result.ErrorOrNil() + } } - // Create a new evaluation - // XXX: The job priority / type is strange for this, since it's not a high - // priority even if the job was. - now := time.Now().UTC().UnixNano() - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: args.RequestNamespace(), - Priority: structs.JobDefaultPriority, - Type: structs.JobTypeService, - TriggeredBy: structs.EvalTriggerJobDeregister, - JobID: args.JobID, - JobModifyIndex: index, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, - WriteRequest: structs.WriteRequest{Region: args.Region}, - } - - // Commit this evaluation via Raft - _, evalIndex, err := j.srv.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - result = multierror.Append(result, err) - j.logger.Error("eval create failed", "error", err, "method", "deregister") - return result.ErrorOrNil() - } - - // Populate the reply with eval information - reply.EvalID = eval.ID - reply.EvalCreateIndex = evalIndex - reply.Index = evalIndex return result.ErrorOrNil() } diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index e1dc0727a..dbc96f133 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -10,6 +10,7 @@ import ( memdb "github.com/hashicorp/go-memdb" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/raft" "github.com/kr/pretty" "github.com/stretchr/testify/require" @@ -1571,6 +1572,318 @@ func TestJobEndpoint_Register_SemverConstraint(t *testing.T) { }) } +// TestJobEndpoint_Register_EvalCreation_Modern asserts that job register creates an eval +// atomically with the registration +func TestJobEndpoint_Register_EvalCreation_Modern(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + t.Run("job registration always create evals", func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + //// initial registration should create the job and a new eval + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + require.NotEmpty(t, resp.EvalID) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp.JobModifyIndex, out.CreateIndex) + + // Lookup the evaluation + eval, err := state.EvalByID(nil, resp.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp.EvalCreateIndex, eval.CreateIndex) + require.Nil(t, evalUpdateFromRaft(t, s1, eval.ID)) + + //// re-registration should create a new eval, but leave the job untouched + var resp2 structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp2) + require.NoError(t, err) + require.NotZero(t, resp2.Index) + require.NotEmpty(t, resp2.EvalID) + require.NotEqual(t, resp.EvalID, resp2.EvalID) + + // Check for the job in the FSM + state = s1.fsm.State() + out, err = state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp2.JobModifyIndex, out.CreateIndex) + require.Equal(t, out.CreateIndex, out.JobModifyIndex) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp2.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp2.EvalCreateIndex, eval.CreateIndex) + + raftEval := evalUpdateFromRaft(t, s1, eval.ID) + require.NotNil(t, raftEval) + require.Equal(t, eval.CreateIndex, raftEval.CreateIndex) + require.Equal(t, eval, raftEval) + + //// an update should update the job and create a new eval + req.Job.TaskGroups[0].Name += "a" + var resp3 structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp3) + require.NoError(t, err) + require.NotZero(t, resp3.Index) + require.NotEmpty(t, resp3.EvalID) + require.NotEqual(t, resp.EvalID, resp3.EvalID) + + // Check for the job in the FSM + state = s1.fsm.State() + out, err = state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp3.JobModifyIndex, out.JobModifyIndex) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp3.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp3.EvalCreateIndex, eval.CreateIndex) + + require.Nil(t, evalUpdateFromRaft(t, s1, eval.ID)) + }) + + // Registering a parameterized job shouldn't create an eval + t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) { + job := mock.PeriodicJob() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + require.Empty(t, resp.EvalID) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp.JobModifyIndex, out.CreateIndex) + }) +} + +// TestJobEndpoint_Register_EvalCreation_Legacy asserts that job register creates an eval +// atomically with the registration, but handle legacy clients by adding a new eval update +func TestJobEndpoint_Register_EvalCreation_Legacy(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + + s2, cleanupS2 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.NumSchedulers = 0 // Prevent automatic dequeue + + // simulate presense of a server that doesn't handle + // new registration eval + c.Build = "0.12.0" + }) + defer cleanupS2() + + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + // keep s1 as the leader + if leader, _ := s1.getLeader(); !leader { + s1, s2 = s2, s1 + } + + codec := rpcClient(t, s1) + + // Create the register request + t.Run("job registration always create evals", func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + //// initial registration should create the job and a new eval + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + require.NotEmpty(t, resp.EvalID) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp.JobModifyIndex, out.CreateIndex) + + // Lookup the evaluation + eval, err := state.EvalByID(nil, resp.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp.EvalCreateIndex, eval.CreateIndex) + + raftEval := evalUpdateFromRaft(t, s1, eval.ID) + require.NotNil(t, raftEval) + require.Equal(t, eval.ID, raftEval.ID) + require.Equal(t, eval.JobModifyIndex, raftEval.JobModifyIndex) + require.Greater(t, raftEval.CreateIndex, eval.CreateIndex) + + //// re-registration should create a new eval, but leave the job untouched + var resp2 structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp2) + require.NoError(t, err) + require.NotZero(t, resp2.Index) + require.NotEmpty(t, resp2.EvalID) + require.NotEqual(t, resp.EvalID, resp2.EvalID) + + // Check for the job in the FSM + state = s1.fsm.State() + out, err = state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp2.JobModifyIndex, out.CreateIndex) + require.Equal(t, out.CreateIndex, out.JobModifyIndex) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp2.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp2.EvalCreateIndex, eval.CreateIndex) + + // this raft eval is the one found above + raftEval = evalUpdateFromRaft(t, s1, eval.ID) + require.Equal(t, raftEval.CreateIndex, eval.CreateIndex) + require.Equal(t, eval, raftEval) + + //// an update should update the job and create a new eval + req.Job.TaskGroups[0].Name += "a" + var resp3 structs.JobRegisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp3) + require.NoError(t, err) + require.NotZero(t, resp3.Index) + require.NotEmpty(t, resp3.EvalID) + require.NotEqual(t, resp.EvalID, resp3.EvalID) + + // Check for the job in the FSM + state = s1.fsm.State() + out, err = state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp3.JobModifyIndex, out.JobModifyIndex) + + // Lookup the evaluation + eval, err = state.EvalByID(nil, resp3.EvalID) + require.NoError(t, err) + require.NotNil(t, eval) + require.Equal(t, resp3.EvalCreateIndex, eval.CreateIndex) + + raftEval = evalUpdateFromRaft(t, s1, eval.ID) + require.NotNil(t, raftEval) + require.Equal(t, eval.ID, raftEval.ID) + require.Equal(t, eval.JobModifyIndex, raftEval.JobModifyIndex) + require.Greater(t, raftEval.CreateIndex, eval.CreateIndex) + }) + + // Registering a parameterized job shouldn't create an eval + t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) { + job := mock.PeriodicJob() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + require.Empty(t, resp.EvalID) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.NoError(t, err) + require.NotNil(t, out) + require.Equal(t, resp.JobModifyIndex, out.CreateIndex) + }) +} + +// evalUpdateFromRaft searches the raft logs for the eval update pertaining to the eval +func evalUpdateFromRaft(t *testing.T, s *Server, evalID string) *structs.Evaluation { + var store raft.LogStore = s.raftInmem + if store == nil { + store = s.raftStore + } + require.NotNil(t, store) + + li, _ := store.LastIndex() + for i, _ := store.FirstIndex(); i <= li; i++ { + var log raft.Log + err := store.GetLog(i, &log) + require.NoError(t, err) + + if log.Type != raft.LogCommand { + continue + } + + if structs.MessageType(log.Data[0]) != structs.EvalUpdateRequestType { + continue + } + + var req structs.EvalUpdateRequest + structs.Decode(log.Data[1:], &req) + require.NoError(t, err) + + for _, eval := range req.Evals { + if eval.ID == evalID { + eval.CreateIndex = i + eval.ModifyIndex = i + return eval + } + } + } + + return nil +} + func TestJobEndpoint_Revert(t *testing.T) { t.Parallel() @@ -2839,6 +3152,187 @@ func TestJobEndpoint_Deregister_ParameterizedJob(t *testing.T) { } } +// TestJobEndpoint_Deregister_EvalCreation_Modern asserts that job deregister creates an eval +// atomically with the registration +func TestJobEndpoint_Deregister_EvalCreation_Modern(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register request + t.Run("job de-registration always create evals", func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + + dereg := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2) + require.NoError(t, err) + require.NotEmpty(t, resp2.EvalID) + + state := s1.fsm.State() + eval, err := state.EvalByID(nil, resp2.EvalID) + require.Nil(t, err) + require.NotNil(t, eval) + require.EqualValues(t, resp2.EvalCreateIndex, eval.CreateIndex) + + require.Nil(t, evalUpdateFromRaft(t, s1, eval.ID)) + + }) + + // Registering a parameterized job shouldn't create an eval + t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) { + job := mock.PeriodicJob() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + + dereg := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2) + require.NoError(t, err) + require.Empty(t, resp2.EvalID) + }) +} + +// TestJobEndpoint_Register_EvalCreation_Legacy asserts that job deregister creates an eval +// atomically with the registration, but handle legacy clients by adding a new eval update +func TestJobEndpoint_Deregister_EvalCreation_Legacy(t *testing.T) { + t.Parallel() + + s1, cleanupS1 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer cleanupS1() + + s2, cleanupS2 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 2 + c.NumSchedulers = 0 // Prevent automatic dequeue + + // simulate presense of a server that doesn't handle + // new registration eval + c.Build = "0.12.0" + }) + defer cleanupS2() + + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + // keep s1 as the leader + if leader, _ := s1.getLeader(); !leader { + s1, s2 = s2, s1 + } + + codec := rpcClient(t, s1) + + // Create the register request + t.Run("job registration always create evals", func(t *testing.T) { + job := mock.Job() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + + dereg := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2) + require.NoError(t, err) + require.NotEmpty(t, resp2.EvalID) + + state := s1.fsm.State() + eval, err := state.EvalByID(nil, resp2.EvalID) + require.Nil(t, err) + require.NotNil(t, eval) + require.EqualValues(t, resp2.EvalCreateIndex, eval.CreateIndex) + + raftEval := evalUpdateFromRaft(t, s1, eval.ID) + require.NotNil(t, raftEval) + require.Equal(t, eval.ID, raftEval.ID) + require.Equal(t, eval.JobModifyIndex, raftEval.JobModifyIndex) + require.Greater(t, raftEval.CreateIndex, eval.CreateIndex) + }) + + // Registering a parameterized job shouldn't create an eval + t.Run("periodic jobs shouldn't create an eval", func(t *testing.T) { + job := mock.PeriodicJob() + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + var resp structs.JobRegisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp) + require.NoError(t, err) + require.NotZero(t, resp.Index) + + dereg := &structs.JobDeregisterRequest{ + JobID: job.ID, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobDeregisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2) + require.NoError(t, err) + require.Empty(t, resp2.EvalID) + }) +} + func TestJobEndpoint_BatchDeregister(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/leader.go b/nomad/leader.go index ca071d2c8..c19b2159b 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -46,6 +46,8 @@ var minSchedulerConfigVersion = version.Must(version.NewVersion("0.9.0")) var minClusterIDVersion = version.Must(version.NewVersion("0.10.4")) +var minJobRegisterAtomicEvalVersion = version.Must(version.NewVersion("0.12.1")) + // monitorLeadership is used to monitor if we acquire or lose our role // as the leader in the Raft cluster. There is some work the leader is // expected to do, so we must react to changes diff --git a/nomad/periodic.go b/nomad/periodic.go index 0a64fac7d..a2ea2219f 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -46,10 +46,24 @@ type JobEvalDispatcher interface { // DispatchJob creates an evaluation for the passed job and commits both the // evaluation and the job to the raft log. It returns the eval. func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) { + now := time.Now().UTC().UnixNano() + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: job.Namespace, + Priority: job.Priority, + Type: job.Type, + TriggeredBy: structs.EvalTriggerPeriodicJob, + JobID: job.ID, + Status: structs.EvalStatusPending, + CreateTime: now, + ModifyTime: now, + } + // Commit this update via Raft job.SetSubmitTime() req := structs.JobRegisterRequest{ - Job: job, + Job: job, + Eval: eval, WriteRequest: structs.WriteRequest{ Namespace: job.Namespace, }, @@ -62,35 +76,30 @@ func (s *Server) DispatchJob(job *structs.Job) (*structs.Evaluation, error) { return nil, err } - // Create a new evaluation - now := time.Now().UTC().UnixNano() - eval := &structs.Evaluation{ - ID: uuid.Generate(), - Namespace: job.Namespace, - Priority: job.Priority, - Type: job.Type, - TriggeredBy: structs.EvalTriggerPeriodicJob, - JobID: job.ID, - JobModifyIndex: index, - Status: structs.EvalStatusPending, - CreateTime: now, - ModifyTime: now, - } - update := &structs.EvalUpdateRequest{ - Evals: []*structs.Evaluation{eval}, + eval.CreateIndex = index + eval.ModifyIndex = index + + // COMPAT(1.1): Remove in 1.1.0 - 0.12.1 introduced atomic eval job registration + if !ServersMeetMinimumVersion(s.Members(), minJobRegisterAtomicEvalVersion, false) { + // Create a new evaluation + eval.JobModifyIndex = index + update := &structs.EvalUpdateRequest{ + Evals: []*structs.Evaluation{eval}, + } + + // Commit this evaluation via Raft + // There is a risk of partial failure where the JobRegister succeeds + // but that the EvalUpdate does not, before Nomad 0.12.1 + _, evalIndex, err := s.raftApply(structs.EvalUpdateRequestType, update) + if err != nil { + return nil, err + } + + // Update its indexes. + eval.CreateIndex = evalIndex + eval.ModifyIndex = evalIndex } - // Commit this evaluation via Raft - // XXX: There is a risk of partial failure where the JobRegister succeeds - // but that the EvalUpdate does not. - _, evalIndex, err := s.raftApply(structs.EvalUpdateRequestType, update) - if err != nil { - return nil, err - } - - // Update its indexes. - eval.CreateIndex = evalIndex - eval.ModifyIndex = evalIndex return eval, nil } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index eb57f3814..058237367 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2540,6 +2540,19 @@ func (s *StateStore) UpsertEvalsTxn(index uint64, evals []*structs.Evaluation, t return nil } +// ErrDuplicateEval indicates that the eval is already present in state and ought not +// to be reprocessed again. +// +// Such duplication might occur in the background compatibility path in job registration +// or deregistration +var ErrDuplicateEval = fmt.Errorf("eval already exists") + +func isPotentialDuplicateEval(eval *structs.Evaluation) bool { + return eval.Status == structs.EvalStatusPending && + (eval.TriggeredBy == structs.EvalTriggerJobRegister || + eval.TriggeredBy == structs.EvalTriggerPeriodicJob) +} + // nestedUpsertEvaluation is used to nest an evaluation upsert within a transaction func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *structs.Evaluation) error { // Lookup the evaluation @@ -2548,6 +2561,10 @@ func (s *StateStore) nestedUpsertEval(txn *memdb.Txn, index uint64, eval *struct return fmt.Errorf("eval lookup failed: %v", err) } + if existing != nil && isPotentialDuplicateEval(eval) { + return ErrDuplicateEval + } + // Update the indexes if existing != nil { eval.CreateIndex = existing.(*structs.Evaluation).CreateIndex diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index cc0550aba..a5ad2eea2 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -563,6 +563,9 @@ type JobRegisterRequest struct { // PolicyOverride is set when the user is attempting to override any policies PolicyOverride bool + // Eval is the evaluation that is associated with the job registration + Eval *Evaluation + WriteRequest } @@ -576,6 +579,9 @@ type JobDeregisterRequest struct { // garbage collector Purge bool + // Eval is the evaluation to create that's associated with job deregister + Eval *Evaluation + WriteRequest }