diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index a018fe528..526985516 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -1890,6 +1890,41 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa return err } + // Ensure that we have only one dispatched version of this job running concurrently + // by comparing the idempotency token against any non-terminal versions. + if args.IdempotencyToken != "" { + // Fetch all jobs that match the parameterized job ID prefix + iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID) + if err != nil { + errMsg := "failed to retrieve jobs for idempotency check" + j.logger.Error(errMsg, "error", err) + return fmt.Errorf(errMsg) + } + + // Iterate + for { + raw := iter.Next() + if raw == nil { + break + } + + // Ensure the parent ID is an exact match + existingJob := raw.(*structs.Job) + if existingJob.ParentID != parameterizedJob.ID { + continue + } + + // Idempotency tokens match. Ensure existing job is terminal. + if existingJob.DispatchIdempotencyToken == args.IdempotencyToken { + // The existing job is either pending or running. + // Registering a new job would violate the idempotency token. + if existingJob.Status != structs.JobStatusDead { + return fmt.Errorf("idempotent dispatch failed: another child job with this token is running or pending: %s", existingJob.ID) + } + } + } + } + // Derive the child job and commit it via Raft - with initial status dispatchJob := parameterizedJob.Copy() dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now()) @@ -1909,40 +1944,6 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa dispatchJob.Meta[k] = v } - // Ensure that we have only one dispatched version of this job running concurrently - // by comparing the idempotency token against any non-terminal versions. - if args.IdempotencyToken != "" { - // Fetch all jobs that match the parameterized job ID prefix - iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID) - if err != nil { - return fmt.Errorf("failed to retrieve jobs for idempotency check") - } - - // Iterate - for { - raw := iter.Next() - if raw == nil { - break - } - - // Ensure the parent ID is an exact match - existingJob := raw.(*structs.Job) - if existingJob.ParentID != dispatchJob.ParentID { - continue - } - - // Idempotency tokens match. Ensure existing job is terminal. - if existingJob.DispatchIdempotencyToken == args.IdempotencyToken { - // The existing job is either pending or running. - // Registering a new job would violate the idempotency token. - if existingJob.Status != structs.JobStatusDead { - return fmt.Errorf("idempotent dispatch failed: another child job with this token is running or pending: %s", existingJob.ID) - } - } - } - - } - // Compress the payload dispatchJob.Payload = snappy.Encode(nil, args.Payload)