mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Merge pull request #10806 from hashicorp/munda/idempotent-job-dispatch
Enforce idempotency of dispatched jobs using token on dispatch request
This commit is contained in:
@@ -1,5 +1,8 @@
|
||||
## 1.1.3 (Unreleased)
|
||||
|
||||
IMPROVEMENTS:
|
||||
* dispatch jobs: Added optional idempotency token to `WriteOptions` which prevents Nomad from creating new dispatched jobs for retried requests. [[GH-10806](https://github.com/hashicorp/nomad/pull/10806)]
|
||||
|
||||
## 1.1.2 (June 22, 2021)
|
||||
|
||||
IMPROVEMENTS:
|
||||
|
||||
@@ -93,6 +93,9 @@ type WriteOptions struct {
|
||||
// ctx is an optional context pass through to the underlying HTTP
|
||||
// request layer. Use Context() and WithContext() to manage this.
|
||||
ctx context.Context
|
||||
|
||||
// IdempotencyToken can be used to ensure the write is idempotent.
|
||||
IdempotencyToken string
|
||||
}
|
||||
|
||||
// QueryMeta is used to return meta data about a query
|
||||
@@ -593,6 +596,9 @@ func (r *request) setWriteOptions(q *WriteOptions) {
|
||||
if q.AuthToken != "" {
|
||||
r.token = q.AuthToken
|
||||
}
|
||||
if q.IdempotencyToken != "" {
|
||||
r.params.Set("idempotency_token", q.IdempotencyToken)
|
||||
}
|
||||
r.ctx = q.Context()
|
||||
}
|
||||
|
||||
|
||||
@@ -255,9 +255,10 @@ func TestSetWriteOptions(t *testing.T) {
|
||||
|
||||
r, _ := c.newRequest("GET", "/v1/jobs")
|
||||
q := &WriteOptions{
|
||||
Region: "foo",
|
||||
Namespace: "bar",
|
||||
AuthToken: "foobar",
|
||||
Region: "foo",
|
||||
Namespace: "bar",
|
||||
AuthToken: "foobar",
|
||||
IdempotencyToken: "idempotent",
|
||||
}
|
||||
r.setWriteOptions(q)
|
||||
|
||||
@@ -267,6 +268,9 @@ func TestSetWriteOptions(t *testing.T) {
|
||||
if r.params.Get("namespace") != "bar" {
|
||||
t.Fatalf("bad: %v", r.params)
|
||||
}
|
||||
if r.params.Get("idempotency_token") != "idempotent" {
|
||||
t.Fatalf("bad: %v", r.params)
|
||||
}
|
||||
if r.token != "foobar" {
|
||||
t.Fatalf("bad: %v", r.token)
|
||||
}
|
||||
|
||||
@@ -666,6 +666,13 @@ func parseNamespace(req *http.Request, n *string) {
|
||||
}
|
||||
}
|
||||
|
||||
// parseIdempotencyToken is used to parse the ?idempotency_token parameter
|
||||
func parseIdempotencyToken(req *http.Request, n *string) {
|
||||
if idempotencyToken := req.URL.Query().Get("idempotency_token"); idempotencyToken != "" {
|
||||
*n = idempotencyToken
|
||||
}
|
||||
}
|
||||
|
||||
// parseBool parses a query parameter to a boolean or returns (nil, nil) if the
|
||||
// parameter is not present.
|
||||
func parseBool(req *http.Request, field string) (*bool, error) {
|
||||
@@ -721,6 +728,7 @@ func (s *HTTPServer) parseWriteRequest(req *http.Request, w *structs.WriteReques
|
||||
parseNamespace(req, &w.Namespace)
|
||||
s.parseToken(req, &w.AuthToken)
|
||||
s.parseRegion(req, &w.Region)
|
||||
parseIdempotencyToken(req, &w.IdempotencyToken)
|
||||
}
|
||||
|
||||
// wrapUntrustedContent wraps handlers in a http.ResponseWriter that prevents
|
||||
|
||||
@@ -9,13 +9,14 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/golang/snappy"
|
||||
"github.com/kr/pretty"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
api "github.com/hashicorp/nomad/api"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/kr/pretty"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestHTTP_JobsList(t *testing.T) {
|
||||
@@ -1454,8 +1455,9 @@ func TestHTTP_JobDispatch(t *testing.T) {
|
||||
respW := httptest.NewRecorder()
|
||||
args2 := structs.JobDispatchRequest{
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: structs.DefaultNamespace,
|
||||
Region: "global",
|
||||
Namespace: structs.DefaultNamespace,
|
||||
IdempotencyToken: "foo",
|
||||
},
|
||||
}
|
||||
buf := encodeReq(args2)
|
||||
|
||||
@@ -1890,6 +1890,43 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
|
||||
return err
|
||||
}
|
||||
|
||||
// Avoid creating new dispatched jobs for retry requests, by using the idempotency token
|
||||
if args.IdempotencyToken != "" {
|
||||
// Fetch all jobs that match the parameterized job ID prefix
|
||||
iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID)
|
||||
if err != nil {
|
||||
errMsg := "failed to retrieve jobs for idempotency check"
|
||||
j.logger.Error(errMsg, "error", err)
|
||||
return fmt.Errorf(errMsg)
|
||||
}
|
||||
|
||||
// Iterate
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
break
|
||||
}
|
||||
|
||||
// Ensure the parent ID is an exact match
|
||||
existingJob := raw.(*structs.Job)
|
||||
if existingJob.ParentID != parameterizedJob.ID {
|
||||
continue
|
||||
}
|
||||
|
||||
// Idempotency tokens match
|
||||
if existingJob.DispatchIdempotencyToken == args.IdempotencyToken {
|
||||
// The existing job has not yet been garbage collected.
|
||||
// Registering a new job would violate the idempotency token.
|
||||
// Return the existing job.
|
||||
reply.JobCreateIndex = existingJob.CreateIndex
|
||||
reply.DispatchedJobID = existingJob.ID
|
||||
reply.Index = existingJob.ModifyIndex
|
||||
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Derive the child job and commit it via Raft - with initial status
|
||||
dispatchJob := parameterizedJob.Copy()
|
||||
dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now())
|
||||
@@ -1899,6 +1936,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
|
||||
dispatchJob.Dispatched = true
|
||||
dispatchJob.Status = ""
|
||||
dispatchJob.StatusDescription = ""
|
||||
dispatchJob.DispatchIdempotencyToken = args.IdempotencyToken
|
||||
|
||||
// Merge in the meta data
|
||||
for k, v := range args.Meta {
|
||||
|
||||
@@ -6134,13 +6134,19 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
||||
Payload: make([]byte, DispatchPayloadSizeLimit+100),
|
||||
}
|
||||
|
||||
type existingIdempotentChildJob struct {
|
||||
isTerminal bool
|
||||
}
|
||||
|
||||
type testCase struct {
|
||||
name string
|
||||
parameterizedJob *structs.Job
|
||||
dispatchReq *structs.JobDispatchRequest
|
||||
noEval bool
|
||||
err bool
|
||||
errStr string
|
||||
name string
|
||||
parameterizedJob *structs.Job
|
||||
dispatchReq *structs.JobDispatchRequest
|
||||
noEval bool
|
||||
err bool
|
||||
errStr string
|
||||
idempotencyToken string
|
||||
existingIdempotentJob *existingIdempotentChildJob
|
||||
}
|
||||
cases := []testCase{
|
||||
{
|
||||
@@ -6233,6 +6239,36 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
||||
err: true,
|
||||
errStr: "stopped",
|
||||
},
|
||||
{
|
||||
name: "idempotency token, no existing child job",
|
||||
parameterizedJob: d1,
|
||||
dispatchReq: reqInputDataNoMeta,
|
||||
err: false,
|
||||
idempotencyToken: "foo",
|
||||
existingIdempotentJob: nil,
|
||||
},
|
||||
{
|
||||
name: "idempotency token, w/ existing non-terminal child job",
|
||||
parameterizedJob: d1,
|
||||
dispatchReq: reqInputDataNoMeta,
|
||||
err: false,
|
||||
idempotencyToken: "foo",
|
||||
existingIdempotentJob: &existingIdempotentChildJob{
|
||||
isTerminal: false,
|
||||
},
|
||||
noEval: true,
|
||||
},
|
||||
{
|
||||
name: "idempotency token, w/ existing terminal job",
|
||||
parameterizedJob: d1,
|
||||
dispatchReq: reqInputDataNoMeta,
|
||||
err: false,
|
||||
idempotencyToken: "foo",
|
||||
existingIdempotentJob: &existingIdempotentChildJob{
|
||||
isTerminal: true,
|
||||
},
|
||||
noEval: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
@@ -6262,8 +6298,30 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
||||
// Now try to dispatch
|
||||
tc.dispatchReq.JobID = tc.parameterizedJob.ID
|
||||
tc.dispatchReq.WriteRequest = structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: tc.parameterizedJob.Namespace,
|
||||
Region: "global",
|
||||
Namespace: tc.parameterizedJob.Namespace,
|
||||
IdempotencyToken: tc.idempotencyToken,
|
||||
}
|
||||
|
||||
// Dispatch with the same request so a child job w/ the idempotency key exists
|
||||
var initialIdempotentDispatchResp structs.JobDispatchResponse
|
||||
if tc.existingIdempotentJob != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &initialIdempotentDispatchResp); err != nil {
|
||||
t.Fatalf("Unexpected error dispatching initial idempotent job: %v", err)
|
||||
}
|
||||
|
||||
if tc.existingIdempotentJob.isTerminal {
|
||||
eval, err := s1.State().EvalByID(nil, initialIdempotentDispatchResp.EvalID)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error fetching eval %v", err)
|
||||
}
|
||||
eval = eval.Copy()
|
||||
eval.Status = structs.EvalStatusComplete
|
||||
err = s1.State().UpsertEvals(structs.MsgTypeTestSetup, initialIdempotentDispatchResp.Index+1, []*structs.Evaluation{eval})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error completing eval %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var dispatchResp structs.JobDispatchResponse
|
||||
@@ -6315,6 +6373,17 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
||||
t.Fatal("parameter job config should exist")
|
||||
}
|
||||
|
||||
// Check that the existing job is returned in the case of a supplied idempotency token
|
||||
if tc.idempotencyToken != "" && tc.existingIdempotentJob != nil {
|
||||
if dispatchResp.DispatchedJobID != initialIdempotentDispatchResp.DispatchedJobID {
|
||||
t.Fatal("dispatched job id should match initial dispatch")
|
||||
}
|
||||
|
||||
if dispatchResp.JobCreateIndex != initialIdempotentDispatchResp.JobCreateIndex {
|
||||
t.Fatal("dispatched job create index should match initial dispatch")
|
||||
}
|
||||
}
|
||||
|
||||
if tc.noEval {
|
||||
return
|
||||
}
|
||||
|
||||
@@ -385,6 +385,9 @@ type WriteRequest struct {
|
||||
// AuthToken is secret portion of the ACL token used for the request
|
||||
AuthToken string
|
||||
|
||||
// IdempotencyToken can be used to ensure the write is idempotent.
|
||||
IdempotencyToken string
|
||||
|
||||
InternalRpcInfo
|
||||
}
|
||||
|
||||
@@ -4016,6 +4019,10 @@ type Job struct {
|
||||
// parameterized job.
|
||||
Dispatched bool
|
||||
|
||||
// DispatchIdempotencyToken is optionally used to ensure that a dispatched job does not have any
|
||||
// non-terminal siblings which have the same token value.
|
||||
DispatchIdempotencyToken string
|
||||
|
||||
// Payload is the payload supplied when the job was dispatched.
|
||||
Payload []byte
|
||||
|
||||
|
||||
6
vendor/github.com/hashicorp/nomad/api/api.go
generated
vendored
6
vendor/github.com/hashicorp/nomad/api/api.go
generated
vendored
@@ -93,6 +93,9 @@ type WriteOptions struct {
|
||||
// ctx is an optional context pass through to the underlying HTTP
|
||||
// request layer. Use Context() and WithContext() to manage this.
|
||||
ctx context.Context
|
||||
|
||||
// IdempotencyToken can be used to ensure the write is idempotent.
|
||||
IdempotencyToken string
|
||||
}
|
||||
|
||||
// QueryMeta is used to return meta data about a query
|
||||
@@ -593,6 +596,9 @@ func (r *request) setWriteOptions(q *WriteOptions) {
|
||||
if q.AuthToken != "" {
|
||||
r.token = q.AuthToken
|
||||
}
|
||||
if q.IdempotencyToken != "" {
|
||||
r.params.Set("idempotency_token", q.IdempotencyToken)
|
||||
}
|
||||
r.ctx = q.Context()
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user