mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Add priority flag to Dispatch CLI and API (#25622)
* Add priority flag to Dispatch CLI and DispatchOpts() helper to HTTP API
This commit is contained in:
3
.changelog/25622.txt
Normal file
3
.changelog/25622.txt
Normal file
@@ -0,0 +1,3 @@
|
||||
```release-note:improvement
|
||||
command: added priority flag to job dispatch command
|
||||
```
|
||||
32
api/jobs.go
32
api/jobs.go
@@ -529,16 +529,39 @@ func (j *Jobs) Summary(jobID string, q *QueryOptions) (*JobSummary, *QueryMeta,
|
||||
return &resp, qm, nil
|
||||
}
|
||||
|
||||
// DispatchOptions is used to pass through job dispatch parameters
|
||||
type DispatchOptions struct {
|
||||
JobID string
|
||||
Meta map[string]string
|
||||
Payload []byte
|
||||
IdPrefixTemplate string
|
||||
Priority int
|
||||
}
|
||||
|
||||
func (j *Jobs) Dispatch(jobID string, meta map[string]string,
|
||||
payload []byte, idPrefixTemplate string, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) {
|
||||
var resp JobDispatchResponse
|
||||
req := &JobDispatchRequest{
|
||||
|
||||
return j.DispatchOpts(&DispatchOptions{
|
||||
JobID: jobID,
|
||||
Meta: meta,
|
||||
Payload: payload,
|
||||
IdPrefixTemplate: idPrefixTemplate,
|
||||
IdPrefixTemplate: idPrefixTemplate},
|
||||
q,
|
||||
)
|
||||
}
|
||||
|
||||
// DispatchOpts is used to dispatch a new job with the passed DispatchOpts. It
|
||||
// returns the ID of the evaluation, along with any errors encountered.
|
||||
func (j *Jobs) DispatchOpts(opts *DispatchOptions, q *WriteOptions) (*JobDispatchResponse, *WriteMeta, error) {
|
||||
var resp JobDispatchResponse
|
||||
req := &JobDispatchRequest{
|
||||
JobID: opts.JobID,
|
||||
Meta: opts.Meta,
|
||||
Payload: opts.Payload,
|
||||
IdPrefixTemplate: opts.IdPrefixTemplate,
|
||||
Priority: opts.Priority,
|
||||
}
|
||||
wm, err := j.client.put("/v1/job/"+url.PathEscape(jobID)+"/dispatch", req, &resp, q)
|
||||
wm, err := j.client.put("/v1/job/"+url.PathEscape(opts.JobID)+"/dispatch", req, &resp, q)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@@ -1571,6 +1594,7 @@ type JobDispatchRequest struct {
|
||||
Payload []byte
|
||||
Meta map[string]string
|
||||
IdPrefixTemplate string
|
||||
Priority int
|
||||
}
|
||||
|
||||
type JobDispatchResponse struct {
|
||||
|
||||
@@ -1964,9 +1964,8 @@ func TestHTTP_JobDispatch(t *testing.T) {
|
||||
},
|
||||
}
|
||||
var resp structs.JobRegisterResponse
|
||||
if err := s.Agent.RPC("Job.Register", &args, &resp); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
err := s.Agent.RPC("Job.Register", &args, &resp)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Make the request
|
||||
respW := httptest.NewRecorder()
|
||||
@@ -1981,29 +1980,136 @@ func TestHTTP_JobDispatch(t *testing.T) {
|
||||
|
||||
// Make the HTTP request
|
||||
req2, err := http.NewRequest(http.MethodPut, "/v1/job/"+job.ID+"/dispatch", buf)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
must.NoError(t, err)
|
||||
respW.Flush()
|
||||
|
||||
// Make the request
|
||||
obj, err := s.Server.JobSpecificRequest(respW, req2)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
must.NoError(t, err)
|
||||
|
||||
// Check the response
|
||||
dispatch := obj.(structs.JobDispatchResponse)
|
||||
if dispatch.EvalID == "" {
|
||||
t.Fatalf("bad: %v", dispatch)
|
||||
}
|
||||
|
||||
if dispatch.DispatchedJobID == "" {
|
||||
t.Fatalf("bad: %v", dispatch)
|
||||
}
|
||||
must.NotEq(t, dispatch.EvalID, "")
|
||||
must.NotEq(t, dispatch.DispatchedJobID, "")
|
||||
})
|
||||
}
|
||||
|
||||
func TestHTTP_JobDispatchPriority(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
defaultPriority := 50
|
||||
validPriority := 90
|
||||
invalidPriority := -1
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
dispatchReq *structs.JobDispatchRequest
|
||||
expectedPriority int
|
||||
expectedErr bool
|
||||
}{
|
||||
{
|
||||
name: "no priority",
|
||||
dispatchReq: &structs.JobDispatchRequest{
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: structs.DefaultNamespace,
|
||||
IdempotencyToken: "foo",
|
||||
},
|
||||
},
|
||||
expectedPriority: defaultPriority,
|
||||
},
|
||||
{
|
||||
name: "set invalid priority",
|
||||
dispatchReq: &structs.JobDispatchRequest{
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: structs.DefaultNamespace,
|
||||
IdempotencyToken: "foo",
|
||||
},
|
||||
Priority: invalidPriority,
|
||||
},
|
||||
expectedPriority: invalidPriority,
|
||||
expectedErr: true,
|
||||
},
|
||||
{
|
||||
name: "set valid priority",
|
||||
dispatchReq: &structs.JobDispatchRequest{
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: structs.DefaultNamespace,
|
||||
IdempotencyToken: "foo",
|
||||
},
|
||||
Priority: validPriority,
|
||||
},
|
||||
expectedPriority: validPriority,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
httpTest(t, nil, func(s *TestAgent) {
|
||||
|
||||
// Create the parameterized job
|
||||
job := mock.BatchJob()
|
||||
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
|
||||
|
||||
args := structs.JobRegisterRequest{
|
||||
Job: job,
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: "global",
|
||||
Namespace: structs.DefaultNamespace,
|
||||
},
|
||||
}
|
||||
|
||||
var resp structs.JobRegisterResponse
|
||||
err := s.Agent.RPC("Job.Register", &args, &resp)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Make the request
|
||||
respW := httptest.NewRecorder()
|
||||
args2 := tc.dispatchReq
|
||||
buf := encodeReq(args2)
|
||||
|
||||
// Make the HTTP request
|
||||
req2, err := http.NewRequest(http.MethodPut, "/v1/job/"+job.ID+"/dispatch", buf)
|
||||
must.NoError(t, err)
|
||||
respW.Flush()
|
||||
|
||||
// Make the request
|
||||
obj, err := s.Server.JobSpecificRequest(respW, req2)
|
||||
if tc.expectedErr {
|
||||
must.ErrorContains(t, err, "job priority must be between")
|
||||
return
|
||||
} else {
|
||||
must.NoError(t, err)
|
||||
}
|
||||
|
||||
// Check the response
|
||||
dispatch := obj.(structs.JobDispatchResponse)
|
||||
must.NotEq(t, dispatch.EvalID, "")
|
||||
must.NotEq(t, dispatch.DispatchedJobID, "")
|
||||
must.NotEq(t, job.ID, dispatch.DispatchedJobID)
|
||||
|
||||
//Check job priority
|
||||
|
||||
// Make the HTTP request
|
||||
bufInfo := encodeReq(structs.Job{ParentID: job.ID})
|
||||
reqInfo, err := http.NewRequest(http.MethodGet, "/v1/job/"+dispatch.DispatchedJobID, bufInfo)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Make the request
|
||||
respInfo := httptest.NewRecorder()
|
||||
objInfo, err := s.Server.JobSpecificRequest(respInfo, reqInfo)
|
||||
must.NoError(t, err)
|
||||
respInfo.Flush()
|
||||
|
||||
// Check the response
|
||||
dispatchJob := objInfo.(*structs.Job)
|
||||
must.Eq(t, tc.expectedPriority, dispatchJob.Priority)
|
||||
})
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestHTTP_JobDispatchPayload(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
httpTest(t, nil, func(s *TestAgent) {
|
||||
|
||||
@@ -122,6 +122,7 @@ func (c *JobDispatchCommand) Run(args []string) int {
|
||||
var idempotencyToken string
|
||||
var meta []string
|
||||
var idPrefixTemplate string
|
||||
var priority int
|
||||
|
||||
flags := c.Meta.FlagSet(c.Name(), FlagSetClient)
|
||||
flags.Usage = func() { c.Ui.Output(c.Help()) }
|
||||
@@ -131,7 +132,7 @@ func (c *JobDispatchCommand) Run(args []string) int {
|
||||
flags.Var((*flaghelper.StringFlag)(&meta), "meta", "")
|
||||
flags.StringVar(&idPrefixTemplate, "id-prefix-template", "", "")
|
||||
flags.BoolVar(&openURL, "ui", false, "")
|
||||
|
||||
flags.IntVar(&priority, "priority", 0, "")
|
||||
if err := flags.Parse(args); err != nil {
|
||||
return 1
|
||||
}
|
||||
@@ -201,7 +202,14 @@ func (c *JobDispatchCommand) Run(args []string) int {
|
||||
IdempotencyToken: idempotencyToken,
|
||||
Namespace: namespace,
|
||||
}
|
||||
resp, _, err := client.Jobs().Dispatch(jobID, metaMap, payload, idPrefixTemplate, w)
|
||||
opts := &api.DispatchOptions{
|
||||
JobID: jobID,
|
||||
Meta: metaMap,
|
||||
Payload: payload,
|
||||
IdPrefixTemplate: idPrefixTemplate,
|
||||
Priority: priority,
|
||||
}
|
||||
resp, _, err := client.Jobs().DispatchOpts(opts, w)
|
||||
if err != nil {
|
||||
c.Ui.Error(fmt.Sprintf("Failed to dispatch job: %s", err))
|
||||
return 1
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
package command
|
||||
|
||||
import (
|
||||
"strconv"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
@@ -104,6 +105,7 @@ func TestJobDispatchCommand_ACL(t *testing.T) {
|
||||
job := mock.MinJob()
|
||||
job.Type = "batch"
|
||||
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
|
||||
job.Priority = 20 //set priority on parent job
|
||||
state := srv.Agent.Server().State()
|
||||
err := state.UpsertJob(structs.MsgTypeTestSetup, 100, nil, job)
|
||||
must.NoError(t, err)
|
||||
@@ -210,3 +212,101 @@ namespace "default" {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestJobDispatchCommand_Priority(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
defaultJobPriority := 50
|
||||
// Start server
|
||||
srv, client, url := testServer(t, true, nil)
|
||||
t.Cleanup(srv.Shutdown)
|
||||
|
||||
waitForNodes(t, client)
|
||||
|
||||
// Create a parameterized job.
|
||||
job := mock.MinJob()
|
||||
job.Type = "batch"
|
||||
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
|
||||
job.Priority = defaultJobPriority // set default priority on parent job
|
||||
state := srv.Agent.Server().State()
|
||||
err := state.UpsertJob(structs.MsgTypeTestSetup, 100, nil, job)
|
||||
must.NoError(t, err)
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
priority string
|
||||
expectedErr bool
|
||||
additionalFlags []string
|
||||
payload map[string]string
|
||||
}{
|
||||
{
|
||||
name: "no priority",
|
||||
},
|
||||
{
|
||||
name: "valid priority",
|
||||
priority: "80",
|
||||
},
|
||||
{
|
||||
name: "invalid priority",
|
||||
priority: "-1",
|
||||
expectedErr: true,
|
||||
},
|
||||
{
|
||||
name: "priority + flag",
|
||||
priority: "90",
|
||||
additionalFlags: []string{"-verbose"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
ui := cli.NewMockUi()
|
||||
cmd := &JobDispatchCommand{Meta: Meta{Ui: ui}}
|
||||
args := []string{
|
||||
"-address", url,
|
||||
}
|
||||
// Add priority, if present
|
||||
if len(tc.priority) >= 1 {
|
||||
args = append(args, []string{"-priority", tc.priority}...)
|
||||
}
|
||||
|
||||
// Add additional flags, if present
|
||||
if len(tc.additionalFlags) >= 1 {
|
||||
args = append(args, tc.additionalFlags...)
|
||||
}
|
||||
|
||||
// Add job ID to the command.
|
||||
args = append(args, job.ID)
|
||||
|
||||
// Run command.
|
||||
code := cmd.Run(args)
|
||||
if !tc.expectedErr {
|
||||
must.Zero(t, code)
|
||||
} else {
|
||||
// Confirm expected error case
|
||||
must.NonZero(t, code)
|
||||
out := ui.ErrorWriter.String()
|
||||
must.StrContains(t, out, "dispatch job priority must be between [1, 100]")
|
||||
return
|
||||
}
|
||||
|
||||
// Confirm successful dispatch and parse job ID
|
||||
out := ui.OutputWriter.String()
|
||||
must.StrContains(t, out, "Dispatched Job ID =")
|
||||
parts := strings.Fields(out)
|
||||
id := strings.TrimSpace(parts[4])
|
||||
|
||||
// Confirm dispatched job priority set correctly
|
||||
job, _, err := client.Jobs().Info(id, nil)
|
||||
must.NoError(t, err)
|
||||
must.NotNil(t, job)
|
||||
|
||||
if len(tc.priority) >= 1 {
|
||||
priority, err := strconv.Atoi(tc.priority)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, job.Priority, &priority)
|
||||
} else {
|
||||
must.Eq(t, defaultJobPriority, *job.Priority)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2035,8 +2035,14 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
|
||||
return fmt.Errorf("Specified job %q is stopped", args.JobID)
|
||||
}
|
||||
|
||||
// Validate the arguments
|
||||
if err := validateDispatchRequest(args, parameterizedJob); err != nil {
|
||||
// Set priority to match parent job if unset
|
||||
if args.Priority == 0 {
|
||||
args.Priority = parameterizedJob.Priority
|
||||
}
|
||||
|
||||
// Validate the arguments and parameterized job
|
||||
agentConfig := j.srv.config
|
||||
if err := validateDispatchRequest(args, parameterizedJob, agentConfig); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -2087,6 +2093,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
|
||||
dispatchJob.Status = ""
|
||||
dispatchJob.StatusDescription = ""
|
||||
dispatchJob.DispatchIdempotencyToken = args.IdempotencyToken
|
||||
dispatchJob.Priority = args.Priority
|
||||
|
||||
// Merge in the meta data
|
||||
for k, v := range args.Meta {
|
||||
@@ -2154,7 +2161,7 @@ func (j *Job) Dispatch(args *structs.JobDispatchRequest, reply *structs.JobDispa
|
||||
|
||||
// validateDispatchRequest returns whether the request is valid given the
|
||||
// parameterized job.
|
||||
func validateDispatchRequest(req *structs.JobDispatchRequest, job *structs.Job) error {
|
||||
func validateDispatchRequest(req *structs.JobDispatchRequest, job *structs.Job, config *Config) error {
|
||||
// Check the payload constraint is met
|
||||
hasInputData := len(req.Payload) != 0
|
||||
if job.ParameterizedJob.Payload == structs.DispatchPayloadRequired && !hasInputData {
|
||||
@@ -2215,6 +2222,11 @@ func validateDispatchRequest(req *structs.JobDispatchRequest, job *structs.Job)
|
||||
return fmt.Errorf("Dispatch did not provide required meta keys: %v", flat)
|
||||
}
|
||||
|
||||
// Confirm that Priority is appropriately set on the JobDispatchRequest
|
||||
if req.Priority < structs.JobMinPriority || req.Priority > config.JobMaxPriority {
|
||||
return fmt.Errorf("dispatch job priority must be between [%d, %d]", structs.JobMinPriority, config.JobMaxPriority)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -6665,7 +6665,8 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
||||
reqInputDataTooLarge := &structs.JobDispatchRequest{
|
||||
Payload: make([]byte, DispatchPayloadSizeLimit+100),
|
||||
}
|
||||
|
||||
reqNoInputValidPriority := &structs.JobDispatchRequest{Priority: 55}
|
||||
reqNoInputInvalidPriority := &structs.JobDispatchRequest{Priority: -1}
|
||||
type existingIdempotentChildJob struct {
|
||||
isTerminal bool
|
||||
}
|
||||
@@ -6675,131 +6676,164 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
||||
parameterizedJob *structs.Job
|
||||
dispatchReq *structs.JobDispatchRequest
|
||||
noEval bool
|
||||
err bool
|
||||
expectError bool
|
||||
errStr string
|
||||
idempotencyToken string
|
||||
existingIdempotentJob *existingIdempotentChildJob
|
||||
expectedPriority int
|
||||
}
|
||||
cases := []testCase{
|
||||
{
|
||||
name: "optional input data w/ data",
|
||||
parameterizedJob: d1,
|
||||
dispatchReq: reqInputDataNoMeta,
|
||||
err: false,
|
||||
expectError: false,
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "optional input data w/o data",
|
||||
parameterizedJob: d1,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: false,
|
||||
expectError: false,
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "require input data w/ data",
|
||||
parameterizedJob: d2,
|
||||
dispatchReq: reqInputDataNoMeta,
|
||||
err: false,
|
||||
expectError: false,
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "require input data w/o data",
|
||||
parameterizedJob: d2,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: true,
|
||||
expectError: true,
|
||||
errStr: "not provided but required",
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "disallow input data w/o data",
|
||||
parameterizedJob: d3,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: false,
|
||||
expectError: false,
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "disallow input data w/ data",
|
||||
parameterizedJob: d3,
|
||||
dispatchReq: reqInputDataNoMeta,
|
||||
err: true,
|
||||
expectError: true,
|
||||
errStr: "provided but forbidden",
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "require meta w/ meta",
|
||||
parameterizedJob: d4,
|
||||
dispatchReq: reqInputDataMeta,
|
||||
err: false,
|
||||
expectError: false,
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "require meta w/o meta",
|
||||
parameterizedJob: d4,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: true,
|
||||
expectError: true,
|
||||
errStr: "did not provide required meta keys",
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "optional meta w/ meta",
|
||||
parameterizedJob: d5,
|
||||
dispatchReq: reqNoInputDataMeta,
|
||||
err: false,
|
||||
expectError: false,
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "optional meta w/o meta",
|
||||
parameterizedJob: d5,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: false,
|
||||
expectError: false,
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "optional meta w/ bad meta",
|
||||
parameterizedJob: d5,
|
||||
dispatchReq: reqBadMeta,
|
||||
err: true,
|
||||
expectError: true,
|
||||
errStr: "unpermitted metadata keys",
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "optional input w/ too big of input",
|
||||
parameterizedJob: d1,
|
||||
dispatchReq: reqInputDataTooLarge,
|
||||
err: true,
|
||||
expectError: true,
|
||||
errStr: "Payload exceeds maximum size",
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "periodic job dispatched, ensure no eval",
|
||||
parameterizedJob: d6,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
noEval: true,
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "periodic job stopped, ensure error",
|
||||
parameterizedJob: d7,
|
||||
dispatchReq: reqNoInputNoMeta,
|
||||
err: true,
|
||||
expectError: true,
|
||||
errStr: "stopped",
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "idempotency token, no existing child job",
|
||||
parameterizedJob: d1,
|
||||
dispatchReq: reqInputDataNoMeta,
|
||||
err: false,
|
||||
expectError: false,
|
||||
idempotencyToken: "foo",
|
||||
existingIdempotentJob: nil,
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "idempotency token, w/ existing non-terminal child job",
|
||||
parameterizedJob: d1,
|
||||
dispatchReq: reqInputDataNoMeta,
|
||||
err: false,
|
||||
expectError: false,
|
||||
idempotencyToken: "foo",
|
||||
existingIdempotentJob: &existingIdempotentChildJob{
|
||||
isTerminal: false,
|
||||
},
|
||||
noEval: true,
|
||||
noEval: true,
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "idempotency token, w/ existing terminal job",
|
||||
parameterizedJob: d1,
|
||||
dispatchReq: reqInputDataNoMeta,
|
||||
err: false,
|
||||
expectError: false,
|
||||
idempotencyToken: "foo",
|
||||
existingIdempotentJob: &existingIdempotentChildJob{
|
||||
isTerminal: true,
|
||||
},
|
||||
noEval: true,
|
||||
noEval: true,
|
||||
expectedPriority: 50,
|
||||
},
|
||||
{
|
||||
name: "valid priority",
|
||||
parameterizedJob: d1,
|
||||
dispatchReq: reqNoInputValidPriority,
|
||||
expectError: false,
|
||||
expectedPriority: reqNoInputValidPriority.Priority,
|
||||
},
|
||||
{
|
||||
name: "invalid priority",
|
||||
parameterizedJob: d1,
|
||||
dispatchReq: reqNoInputInvalidPriority,
|
||||
expectError: true,
|
||||
errStr: "priority must be between",
|
||||
expectedPriority: reqNoInputInvalidPriority.Priority,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -6823,9 +6857,8 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
||||
|
||||
// Fetch the response
|
||||
var regResp structs.JobRegisterResponse
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Job.Register", regReq, ®Resp); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
err := msgpackrpc.CallWithCodec(codec, "Job.Register", regReq, ®Resp)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Now try to dispatch
|
||||
tc.dispatchReq.JobID = tc.parameterizedJob.ID
|
||||
@@ -6838,107 +6871,66 @@ func TestJobEndpoint_Dispatch(t *testing.T) {
|
||||
// Dispatch with the same request so a child job w/ the idempotency key exists
|
||||
var initialIdempotentDispatchResp structs.JobDispatchResponse
|
||||
if tc.existingIdempotentJob != nil {
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &initialIdempotentDispatchResp); err != nil {
|
||||
t.Fatalf("Unexpected error dispatching initial idempotent job: %v", err)
|
||||
}
|
||||
err := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &initialIdempotentDispatchResp)
|
||||
must.NoError(t, err)
|
||||
|
||||
if tc.existingIdempotentJob.isTerminal {
|
||||
eval, err := s1.State().EvalByID(nil, initialIdempotentDispatchResp.EvalID)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error fetching eval %v", err)
|
||||
}
|
||||
must.NoError(t, err)
|
||||
|
||||
eval = eval.Copy()
|
||||
eval.Status = structs.EvalStatusComplete
|
||||
err = s1.State().UpsertEvals(structs.MsgTypeTestSetup, initialIdempotentDispatchResp.Index+1, []*structs.Evaluation{eval})
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error completing eval %v", err)
|
||||
}
|
||||
must.NoError(t, err)
|
||||
}
|
||||
}
|
||||
|
||||
var dispatchResp structs.JobDispatchResponse
|
||||
dispatchErr := msgpackrpc.CallWithCodec(codec, "Job.Dispatch", tc.dispatchReq, &dispatchResp)
|
||||
|
||||
if dispatchErr == nil {
|
||||
if tc.err {
|
||||
t.Fatalf("Expected error: %v", dispatchErr)
|
||||
}
|
||||
|
||||
// Check that we got an eval and job id back
|
||||
switch dispatchResp.EvalID {
|
||||
case "":
|
||||
if !tc.noEval {
|
||||
t.Fatalf("Bad response")
|
||||
}
|
||||
default:
|
||||
if tc.noEval {
|
||||
t.Fatalf("Got eval %q", dispatchResp.EvalID)
|
||||
}
|
||||
}
|
||||
|
||||
if dispatchResp.DispatchedJobID == "" {
|
||||
t.Fatalf("Bad response")
|
||||
}
|
||||
|
||||
state := s1.fsm.State()
|
||||
ws := memdb.NewWatchSet()
|
||||
out, err := state.JobByID(ws, tc.parameterizedJob.Namespace, dispatchResp.DispatchedJobID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if out == nil {
|
||||
t.Fatalf("expected job")
|
||||
}
|
||||
if out.CreateIndex != dispatchResp.JobCreateIndex {
|
||||
t.Fatalf("index mis-match")
|
||||
}
|
||||
if out.ParentID != tc.parameterizedJob.ID {
|
||||
t.Fatalf("bad parent ID")
|
||||
}
|
||||
if !out.Dispatched {
|
||||
t.Fatal("expected dispatched job")
|
||||
}
|
||||
if out.IsParameterized() {
|
||||
t.Fatal("dispatched job should not be parameterized")
|
||||
}
|
||||
if out.ParameterizedJob == nil {
|
||||
t.Fatal("parameter job config should exist")
|
||||
}
|
||||
|
||||
// Check that the existing job is returned in the case of a supplied idempotency token
|
||||
if tc.idempotencyToken != "" && tc.existingIdempotentJob != nil {
|
||||
if dispatchResp.DispatchedJobID != initialIdempotentDispatchResp.DispatchedJobID {
|
||||
t.Fatal("dispatched job id should match initial dispatch")
|
||||
}
|
||||
|
||||
if dispatchResp.JobCreateIndex != initialIdempotentDispatchResp.JobCreateIndex {
|
||||
t.Fatal("dispatched job create index should match initial dispatch")
|
||||
}
|
||||
}
|
||||
|
||||
if tc.noEval {
|
||||
return
|
||||
}
|
||||
|
||||
// Lookup the evaluation
|
||||
eval, err := state.EvalByID(ws, dispatchResp.EvalID)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if eval == nil {
|
||||
t.Fatalf("expected eval")
|
||||
}
|
||||
if eval.CreateIndex != dispatchResp.EvalCreateIndex {
|
||||
t.Fatalf("index mis-match")
|
||||
}
|
||||
if tc.expectError {
|
||||
must.ErrorContains(t, dispatchErr, tc.errStr)
|
||||
return
|
||||
} else {
|
||||
if !tc.err {
|
||||
t.Fatalf("Got unexpected error: %v", dispatchErr)
|
||||
} else if !strings.Contains(dispatchErr.Error(), tc.errStr) {
|
||||
t.Fatalf("Expected err to include %q; got %v", tc.errStr, dispatchErr)
|
||||
}
|
||||
must.NoError(t, dispatchErr)
|
||||
}
|
||||
|
||||
// Check that we got an eval and job id back
|
||||
must.NotEq(t, dispatchResp.DispatchedJobID, "")
|
||||
if tc.noEval {
|
||||
must.Eq(t, dispatchResp.EvalID, "")
|
||||
} else {
|
||||
must.NotEq(t, dispatchResp.EvalID, "")
|
||||
}
|
||||
|
||||
state := s1.fsm.State()
|
||||
ws := memdb.NewWatchSet()
|
||||
out, err := state.JobByID(ws, tc.parameterizedJob.Namespace, dispatchResp.DispatchedJobID)
|
||||
must.NoError(t, err)
|
||||
must.NotNil(t, out)
|
||||
|
||||
// Check job characteristics
|
||||
must.Eq(t, out.CreateIndex, dispatchResp.JobCreateIndex)
|
||||
must.Eq(t, out.ParentID, tc.parameterizedJob.ID)
|
||||
must.Eq(t, out.Dispatched, true)
|
||||
must.Eq(t, out.IsParameterized(), false)
|
||||
must.NotNil(t, out.ParameterizedJob)
|
||||
must.Eq(t, tc.expectedPriority, out.Priority)
|
||||
|
||||
// Check that the existing job is returned in the case of a supplied idempotency token
|
||||
if tc.idempotencyToken != "" && tc.existingIdempotentJob != nil {
|
||||
must.Eq(t, dispatchResp.DispatchedJobID, initialIdempotentDispatchResp.DispatchedJobID)
|
||||
must.Eq(t, dispatchResp.JobCreateIndex, initialIdempotentDispatchResp.JobCreateIndex)
|
||||
}
|
||||
|
||||
if tc.noEval {
|
||||
return
|
||||
}
|
||||
|
||||
// Lookup the evaluation
|
||||
eval, err := state.EvalByID(ws, dispatchResp.EvalID)
|
||||
must.NoError(t, err)
|
||||
must.NotNil(t, eval)
|
||||
must.Eq(t, eval.CreateIndex, dispatchResp.EvalCreateIndex)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -934,6 +934,7 @@ type JobDispatchRequest struct {
|
||||
Meta map[string]string
|
||||
WriteRequest
|
||||
IdPrefixTemplate string
|
||||
Priority int
|
||||
}
|
||||
|
||||
// JobValidateRequest is used to validate a job
|
||||
|
||||
@@ -78,6 +78,10 @@ dispatching parameterized jobs.
|
||||
|
||||
- `-ui`: Open the dispatched job in the browser.
|
||||
|
||||
- `-priority`: Optional integer between 1 and [`job_max_priority`],
|
||||
inclusively that overrides any priority value inherited from the parent job.
|
||||
For general usage information on the priority flag see [job parameters].
|
||||
|
||||
## Examples
|
||||
|
||||
Dispatch against a parameterized job with the ID "video-encode" and
|
||||
@@ -166,3 +170,5 @@ Evaluation ID = 31199841
|
||||
[eval status]: /nomad/docs/commands/eval/status
|
||||
[parameterized job]: /nomad/docs/job-specification/parameterized 'Nomad parameterized Job Specification'
|
||||
[multiregion]: /nomad/docs/job-specification/multiregion#parameterized-dispatch
|
||||
[`job_max_priority`]: /nomad/docs/configuration/server#job_max_priority
|
||||
[job parameters]: /nomad/docs/job-specification/job#job-parameters
|
||||
|
||||
Reference in New Issue
Block a user