From 02209b1371f1ddfda4a35c2aa6d511eb9f30b790 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 15 Jun 2020 16:18:14 -0400 Subject: [PATCH] Multiregion job registration Integration points for multiregion jobs to be registered in the enterprise version of Nomad: * hook in `Job.Register` for enterprise to send job to peer regions * remove monitoring from `nomad job run` and `nomad job stop` for multiregion jobs --- api/jobs.go | 5 ++++ command/agent/job_endpoint.go | 30 ++++++++++++++----- command/agent/job_endpoint_oss.go | 11 +++++++ command/job_run.go | 3 +- command/job_stop.go | 2 +- command/util_test.go | 1 + nomad/job_endpoint.go | 7 +++++ nomad/job_endpoint_oss.go | 5 ++++ nomad/mock/mock.go | 25 ++++++++++++++++ nomad/structs/structs.go | 2 +- vendor/github.com/hashicorp/nomad/api/jobs.go | 5 ++++ 11 files changed, 85 insertions(+), 11 deletions(-) create mode 100644 command/agent/job_endpoint_oss.go diff --git a/api/jobs.go b/api/jobs.go index fbb3b2417..17b80834c 100644 --- a/api/jobs.go +++ b/api/jobs.go @@ -814,6 +814,11 @@ func (j *Job) IsParameterized() bool { return j.ParameterizedJob != nil && !j.Dispatched } +// IsMultiregion returns whether a job is a multiregion job +func (j *Job) IsMultiregion() bool { + return j.Multiregion != nil && j.Multiregion.Regions != nil && len(j.Multiregion.Regions) > 0 +} + func (j *Job) Canonicalize() { if j.ID == nil { j.ID = stringToPtr("") diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index fb9573905..2351e1221 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -146,16 +146,20 @@ func (s *HTTPServer) jobPlan(resp http.ResponseWriter, req *http.Request, return nil, CodedError(400, "Job ID does not match") } + var region *string + // Region in http request query param takes precedence over region in job hcl config if args.WriteRequest.Region != "" { - args.Job.Region = helper.StringToPtr(args.WriteRequest.Region) + region = helper.StringToPtr(args.WriteRequest.Region) } // If 'global' region is specified or if no region is given, // default to region of the node you're submitting to - if args.Job.Region == nil || *args.Job.Region == "" || *args.Job.Region == api.GlobalRegion { - args.Job.Region = &s.agent.config.Region + if region == nil || args.Job.Region == nil || + *args.Job.Region == "" || *args.Job.Region == api.GlobalRegion { + region = &s.agent.config.Region } + args.Job.Region = regionForJob(args.Job, region) sJob := ApiJobToStructJob(args.Job) planReq := structs.JobPlanRequest{ @@ -163,7 +167,7 @@ func (s *HTTPServer) jobPlan(resp http.ResponseWriter, req *http.Request, Diff: args.Diff, PolicyOverride: args.PolicyOverride, WriteRequest: structs.WriteRequest{ - Region: sJob.Region, + Region: *region, }, } // parseWriteRequest overrides Namespace, Region and AuthToken @@ -395,17 +399,27 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request, if jobName != "" && *args.Job.ID != jobName { return nil, CodedError(400, "Job ID does not match name") } + if args.Job.Multiregion != nil && args.Job.Region != nil { + region := *args.Job.Region + if !(region == "global" || region == "") { + return nil, CodedError(400, "Job can't have both multiregion and region blocks") + } + } + + var region *string // Region in http request query param takes precedence over region in job hcl config if args.WriteRequest.Region != "" { - args.Job.Region = helper.StringToPtr(args.WriteRequest.Region) + region = helper.StringToPtr(args.WriteRequest.Region) } // If 'global' region is specified or if no region is given, // default to region of the node you're submitting to - if args.Job.Region == nil || *args.Job.Region == "" || *args.Job.Region == api.GlobalRegion { - args.Job.Region = &s.agent.config.Region + if region == nil || args.Job.Region == nil || + *args.Job.Region == "" || *args.Job.Region == api.GlobalRegion { + region = &s.agent.config.Region } + args.Job.Region = regionForJob(args.Job, region) sJob := ApiJobToStructJob(args.Job) regReq := structs.JobRegisterRequest{ @@ -415,7 +429,7 @@ func (s *HTTPServer) jobUpdate(resp http.ResponseWriter, req *http.Request, PolicyOverride: args.PolicyOverride, PreserveCounts: args.PreserveCounts, WriteRequest: structs.WriteRequest{ - Region: sJob.Region, + Region: *region, AuthToken: args.WriteRequest.SecretID, }, } diff --git a/command/agent/job_endpoint_oss.go b/command/agent/job_endpoint_oss.go new file mode 100644 index 000000000..58d34d5b1 --- /dev/null +++ b/command/agent/job_endpoint_oss.go @@ -0,0 +1,11 @@ +// +build !ent + +package agent + +import ( + "github.com/hashicorp/nomad/api" +) + +func regionForJob(job *api.Job, requestRegion *string) *string { + return requestRegion +} diff --git a/command/job_run.go b/command/job_run.go index b52ae2b0a..3af389b21 100644 --- a/command/job_run.go +++ b/command/job_run.go @@ -191,6 +191,7 @@ func (c *JobRunCommand) Run(args []string) int { // Check if the job is periodic or is a parameterized job periodic := job.IsPeriodic() paramjob := job.IsParameterized() + multiregion := job.IsMultiregion() // Parse the Consul token if consulToken == "" { @@ -271,7 +272,7 @@ func (c *JobRunCommand) Run(args []string) int { evalID := resp.EvalID // Check if we should enter monitor mode - if detach || periodic || paramjob { + if detach || periodic || paramjob || multiregion { c.Ui.Output("Job registration successful") if periodic && !paramjob { loc, err := job.Periodic.GetLocation() diff --git a/command/job_stop.go b/command/job_stop.go index 9f05e0d90..c6d088592 100644 --- a/command/job_stop.go +++ b/command/job_stop.go @@ -171,7 +171,7 @@ func (c *JobStopCommand) Run(args []string) int { return 0 } - if detach { + if detach || job.IsMultiregion() { c.Ui.Output(evalID) return 0 } diff --git a/command/util_test.go b/command/util_test.go index a7060742a..8abd37860 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -70,6 +70,7 @@ func testMultiRegionJob(jobID, region, datacenter string) *api.Job { }) job := api.NewServiceJob(jobID, jobID, region, 1).AddDatacenter(datacenter).AddTaskGroup(group) + job.Region = nil job.Multiregion = &api.Multiregion{ Regions: []*api.MultiregionRegion{ { diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 1d68217aa..52b3ff790 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -291,6 +291,13 @@ func (j *Job) Register(args *structs.JobRegisterRequest, reply *structs.JobRegis } } + // Submit a multiregion job to other regions (enterprise only). + // The job will have its region interpolated. + err = j.multiregionRegister(args, reply) + if err != nil { + return err + } + // Check if the job has changed at all if existingJob == nil || existingJob.SpecChanged(args.Job) { // Set the submit time diff --git a/nomad/job_endpoint_oss.go b/nomad/job_endpoint_oss.go index fd4aaa8e7..214b93412 100644 --- a/nomad/job_endpoint_oss.go +++ b/nomad/job_endpoint_oss.go @@ -8,3 +8,8 @@ import "github.com/hashicorp/nomad/nomad/structs" func (j *Job) enforceSubmitJob(override bool, job *structs.Job) (error, error) { return nil, nil } + +// multiregionRegister is used to send a job across multiple regions +func (j *Job) multiregionRegister(args *structs.JobRegisterRequest, reply *structs.JobRegisterResponse) error { + return nil +} diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 23738503c..2111489b7 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -1287,6 +1287,31 @@ func JobWithScalingPolicy() (*structs.Job, *structs.ScalingPolicy) { return job, policy } +func MultiregionJob() *structs.Job { + job := Job() + job.Multiregion = &structs.Multiregion{ + Strategy: &structs.MultiregionStrategy{ + MaxParallel: 1, + AutoRevert: "all", + }, + Regions: []*structs.MultiregionRegion{ + { + Name: "west", + Count: 2, + Datacenters: []string{"west-1", "west-2"}, + Meta: map[string]string{"region_code": "W"}, + }, + { + Name: "east", + Count: 1, + Datacenters: []string{"east-1"}, + Meta: map[string]string{"region_code": "E"}, + }, + }, + } + return job +} + func CSIPlugin() *structs.CSIPlugin { return &structs.CSIPlugin{ ID: uuid.Generate(), diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 76a15753b..73f19c795 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3748,7 +3748,7 @@ func (j *Job) Copy() *Job { func (j *Job) Validate() error { var mErr multierror.Error - if j.Region == "" { + if j.Region == "" && j.Multiregion == nil { mErr.Errors = append(mErr.Errors, errors.New("Missing job region")) } if j.ID == "" { diff --git a/vendor/github.com/hashicorp/nomad/api/jobs.go b/vendor/github.com/hashicorp/nomad/api/jobs.go index fbb3b2417..17b80834c 100644 --- a/vendor/github.com/hashicorp/nomad/api/jobs.go +++ b/vendor/github.com/hashicorp/nomad/api/jobs.go @@ -814,6 +814,11 @@ func (j *Job) IsParameterized() bool { return j.ParameterizedJob != nil && !j.Dispatched } +// IsMultiregion returns whether a job is a multiregion job +func (j *Job) IsMultiregion() bool { + return j.Multiregion != nil && j.Multiregion.Regions != nil && len(j.Multiregion.Regions) > 0 +} + func (j *Job) Canonicalize() { if j.ID == nil { j.ID = stringToPtr("")