Multiregion job registration

Integration points for multiregion jobs to be registered in the enterprise
version of Nomad:
* hook in `Job.Register` for enterprise to send job to peer regions
* remove monitoring from `nomad job run` and `nomad job stop` for multiregion jobs
This commit is contained in:
Tim Gross
2020-06-15 16:18:14 -04:00
parent e620ff7f0f
commit 02209b1371
11 changed files with 85 additions and 11 deletions

View File

@@ -814,6 +814,11 @@ func (j *Job) IsParameterized() bool {
return j.ParameterizedJob != nil && !j.Dispatched
}
// IsMultiregion returns whether a job is a multiregion job
func (j *Job) IsMultiregion() bool {
return j.Multiregion != nil && j.Multiregion.Regions != nil && len(j.Multiregion.Regions) > 0
}
func (j *Job) Canonicalize() {
if j.ID == nil {
j.ID = stringToPtr("")

View File

@@ -146,16 +146,20 @@ func (s *HTTPServer) jobPlan(resp http.ResponseWriter, req *http.Request,
return nil, CodedError(400, "Job ID does not match")
}
var region *string
// Region in http request query param takes precedence over region in job hcl config
if args.WriteRequest.Region != "" {
args.Job.Region = helper.StringToPtr(args.WriteRequest.Region)
region = helper.StringToPtr(args.WriteRequest.Region)
}
// If 'global' region is specified or if no region is given,
// default to region of the node you're submitting to
if args.Job.Region == nil || *args.Job.Region == "" || *args.Job.Region == api.GlobalRegion {
args.Job.Region = &s.agent.config.Region
if region == nil || args.Job.Region == nil ||
*args.Job.Region == "" || *args.Job.Region == api.GlobalRegion {
region = &s.agent.config.Region
}
args.Job.Region = regionForJob(args.Job, region)
sJob := ApiJobToStructJob(args.Job)
planReq := structs.JobPlanRequest{
@@ -163,7 +167,7 @@ func (s *HTTPServer) jobPlan(resp http.ResponseWriter, req *http.Request,
Diff: args.Diff,
PolicyOverride: args.PolicyOverride,
WriteRequest: structs.WriteRequest{
Region: sJob.Region,
Region: *region,
},
}
// parseWriteRequest overrides Namespace, Region and AuthToken
@@ -395,17 +399,27 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request,
if jobName != "" && *args.Job.ID != jobName {
return nil, CodedError(400, "Job ID does not match name")
}
if args.Job.Multiregion != nil && args.Job.Region != nil {
region := *args.Job.Region
if !(region == "global" || region == "") {
return nil, CodedError(400, "Job can't have both multiregion and region blocks")
}
}
var region *string
// Region in http request query param takes precedence over region in job hcl config
if args.WriteRequest.Region != "" {
args.Job.Region = helper.StringToPtr(args.WriteRequest.Region)
region = helper.StringToPtr(args.WriteRequest.Region)
}
// If 'global' region is specified or if no region is given,
// default to region of the node you're submitting to
if args.Job.Region == nil || *args.Job.Region == "" || *args.Job.Region == api.GlobalRegion {
args.Job.Region = &s.agent.config.Region
if region == nil || args.Job.Region == nil ||
*args.Job.Region == "" || *args.Job.Region == api.GlobalRegion {
region = &s.agent.config.Region
}
args.Job.Region = regionForJob(args.Job, region)
sJob := ApiJobToStructJob(args.Job)
regReq := structs.JobRegisterRequest{
@@ -415,7 +429,7 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request,
PolicyOverride: args.PolicyOverride,
PreserveCounts: args.PreserveCounts,
WriteRequest: structs.WriteRequest{
Region: sJob.Region,
Region: *region,
AuthToken: args.WriteRequest.SecretID,
},
}

View File

@@ -0,0 +1,11 @@
// +build !ent
package agent
import (
"github.com/hashicorp/nomad/api"
)
func regionForJob(job *api.Job, requestRegion *string) *string {
return requestRegion
}

View File

@@ -191,6 +191,7 @@ func (c *JobRunCommand) Run(args []string) int {
// Check if the job is periodic or is a parameterized job
periodic := job.IsPeriodic()
paramjob := job.IsParameterized()
multiregion := job.IsMultiregion()
// Parse the Consul token
if consulToken == "" {
@@ -271,7 +272,7 @@ func (c *JobRunCommand) Run(args []string) int {
evalID := resp.EvalID
// Check if we should enter monitor mode
if detach || periodic || paramjob {
if detach || periodic || paramjob || multiregion {
c.Ui.Output("Job registration successful")
if periodic && !paramjob {
loc, err := job.Periodic.GetLocation()

View File

@@ -171,7 +171,7 @@ func (c *JobStopCommand) Run(args []string) int {
return 0
}
if detach {
if detach || job.IsMultiregion() {
c.Ui.Output(evalID)
return 0
}

View File

@@ -70,6 +70,7 @@ func testMultiRegionJob(jobID, region, datacenter string) *api.Job {
})
job := api.NewServiceJob(jobID, jobID, region, 1).AddDatacenter(datacenter).AddTaskGroup(group)
job.Region = nil
job.Multiregion = &api.Multiregion{
Regions: []*api.MultiregionRegion{
{

View File

@@ -291,6 +291,13 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis
}
}
// Submit a multiregion job to other regions (enterprise only).
// The job will have its region interpolated.
err = j.multiregionRegister(args, reply)
if err != nil {
return err
}
// Check if the job has changed at all
if existingJob == nil || existingJob.SpecChanged(args.Job) {
// Set the submit time

View File

@@ -8,3 +8,8 @@ import "github.com/hashicorp/nomad/nomad/structs"
func (j *Job) enforceSubmitJob(override bool, job *structs.Job) (error, error) {
return nil, nil
}
// multiregionRegister is used to send a job across multiple regions
func (j *Job) multiregionRegister(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse) error {
return nil
}

View File

@@ -1287,6 +1287,31 @@ func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) {
return job, policy
}
func MultiregionJob() *structs.Job {
job := Job()
job.Multiregion = &structs.Multiregion{
Strategy: &structs.MultiregionStrategy{
MaxParallel: 1,
AutoRevert: "all",
},
Regions: []*structs.MultiregionRegion{
{
Name: "west",
Count: 2,
Datacenters: []string{"west-1", "west-2"},
Meta: map[string]string{"region_code": "W"},
},
{
Name: "east",
Count: 1,
Datacenters: []string{"east-1"},
Meta: map[string]string{"region_code": "E"},
},
},
}
return job
}
func CSIPlugin() *structs.CSIPlugin {
return &structs.CSIPlugin{
ID: uuid.Generate(),

View File

@@ -3748,7 +3748,7 @@ func (j *Job) Copy() *Job {
func (j *Job) Validate() error {
var mErr multierror.Error
if j.Region == "" {
if j.Region == "" && j.Multiregion == nil {
mErr.Errors = append(mErr.Errors, errors.New("Missing job region"))
}
if j.ID == "" {

View File

@@ -814,6 +814,11 @@ func (j *Job) IsParameterized() bool {
return j.ParameterizedJob != nil && !j.Dispatched
}
// IsMultiregion returns whether a job is a multiregion job
func (j *Job) IsMultiregion() bool {
return j.Multiregion != nil && j.Multiregion.Regions != nil && len(j.Multiregion.Regions) > 0
}
func (j *Job) Canonicalize() {
if j.ID == nil {
j.ID = stringToPtr("")