Move idempotency check closer to validate. Log error.

This commit is contained in:
Alex Munda
2021-07-02 10:58:42 -05:00
parent aa3512d11c
commit 4448cff8a3

View File

@@ -1890,6 +1890,41 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
return err
}
// Ensure that we have only one dispatched version of this job running concurrently
// by comparing the idempotency token against any non-terminal versions.
if args.IdempotencyToken != "" {
// Fetch all jobs that match the parameterized job ID prefix
iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID)
if err != nil {
errMsg := "failed to retrieve jobs for idempotency check"
j.logger.Error(errMsg, "error", err)
return fmt.Errorf(errMsg)
}
// Iterate
for {
raw := iter.Next()
if raw == nil {
break
}
// Ensure the parent ID is an exact match
existingJob := raw.(*structs.Job)
if existingJob.ParentID != parameterizedJob.ID {
continue
}
// Idempotency tokens match. Ensure existing job is terminal.
if existingJob.DispatchIdempotencyToken == args.IdempotencyToken {
// The existing job is either pending or running.
// Registering a new job would violate the idempotency token.
if existingJob.Status != structs.JobStatusDead {
return fmt.Errorf("idempotent dispatch failed: another child job with this token is running or pending: %s", existingJob.ID)
}
}
}
}
// Derive the child job and commit it via Raft - with initial status
dispatchJob := parameterizedJob.Copy()
dispatchJob.ID = structs.DispatchedID(parameterizedJob.ID, time.Now())
@@ -1909,40 +1944,6 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
dispatchJob.Meta[k] = v
}
// Ensure that we have only one dispatched version of this job running concurrently
// by comparing the idempotency token against any non-terminal versions.
if args.IdempotencyToken != "" {
// Fetch all jobs that match the parameterized job ID prefix
iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID)
if err != nil {
return fmt.Errorf("failed to retrieve jobs for idempotency check")
}
// Iterate
for {
raw := iter.Next()
if raw == nil {
break
}
// Ensure the parent ID is an exact match
existingJob := raw.(*structs.Job)
if existingJob.ParentID != dispatchJob.ParentID {
continue
}
// Idempotency tokens match. Ensure existing job is terminal.
if existingJob.DispatchIdempotencyToken == args.IdempotencyToken {
// The existing job is either pending or running.
// Registering a new job would violate the idempotency token.
if existingJob.Status != structs.JobStatusDead {
return fmt.Errorf("idempotent dispatch failed: another child job with this token is running or pending: %s", existingJob.ID)
}
}
}
}
// Compress the payload
dispatchJob.Payload = snappy.Encode(nil, args.Payload)