Enforce idempotency of dispatched jobs using special meta key

This commit is contained in:
Alex Munda
2021-06-23 16:51:59 -05:00
parent 09bd33640b
commit c4be87ad1c

View File

@@ -32,6 +32,11 @@ const (
// DispatchPayloadSizeLimit is the maximum size of the uncompressed input
// data payload.
DispatchPayloadSizeLimit = 16 * 1024
// MetaDispatchIdempotencyKey is the meta key that when provided, is used
// to perform an idempotency check to ensure only 1 child of a parameterized job
// with the supplied key may be running (or pending) at a time.
MetaDispatchIdempotencyKey = "nomad_dispatch_idempotency_key"
)
// ErrMultipleNamespaces is send when multiple namespaces are used in the OSS setup
@@ -1908,6 +1913,39 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
dispatchJob.Meta[k] = v
}
// Check to see if an idempotency key was provided on the meta
if idempotencyKey, ok := dispatchJob.Meta[MetaDispatchIdempotencyKey]; ok {
// Fetch all jobs that match the parameterized job ID prefix
iter, err := snap.JobsByIDPrefix(ws, parameterizedJob.Namespace, parameterizedJob.ID)
if err != nil {
return fmt.Errorf("failed to retrieve jobs for idempotency check")
}
// Iterate
for {
raw := iter.Next()
if raw == nil {
break
}
// Ensure the parent ID is an exact match
existingJob := raw.(*structs.Job)
if existingJob.ParentID != dispatchJob.ParentID {
continue
}
// Idempotency keys match. Ensure existing job is not currently running.
if ik, ok := existingJob.Meta[MetaDispatchIdempotencyKey]; ok && ik == idempotencyKey {
// The existing job is either pending or running.
// Registering a new job would violate the idempotency key.
if existingJob.Status != structs.JobStatusDead {
return fmt.Errorf("dispatch violates idempotency key of non-terminal child job: %s", existingJob.ID)
}
}
}
}
// Compress the payload
dispatchJob.Payload = snappy.Encode(nil, args.Payload)