Merge pull request #9586 from hashicorp/f-connect-interp

consul/connect: interpolate connect block
This commit is contained in:
Seth Hoenig
2020-12-09 13:21:50 -06:00
committed by GitHub
6 changed files with 546 additions and 68 deletions

View File

@@ -1,3 +1,8 @@
## 1.0.1 (Unreleased)
IMPROVEMENTS:
* consul/connect: interpolate the connect, service meta, and service canary meta blocks with the task environment [[GH-9586](https://github.com/hashicorp/nomad/pull/9586)]
## 1.0.0 (December 8, 2020)
FEATURES:

View File

@@ -16,8 +16,8 @@ func InterpolateServices(taskEnv *TaskEnv, services []*structs.Service) []*struc
interpolated := make([]*structs.Service, len(services))
for i, origService := range services {
// Create a copy as we need to reinterpolate every time the
// environment changes
// Create a copy as we need to re-interpolate every time the
// environment changes.
service := origService.Copy()
for _, check := range service.Checks {
@@ -31,42 +31,171 @@ func InterpolateServices(taskEnv *TaskEnv, services []*structs.Service) []*struc
check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus)
check.Method = taskEnv.ReplaceEnv(check.Method)
check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService)
if len(check.Header) > 0 {
header := make(map[string][]string, len(check.Header))
for k, vs := range check.Header {
newVals := make([]string, len(vs))
for i, v := range vs {
newVals[i] = taskEnv.ReplaceEnv(v)
}
header[taskEnv.ReplaceEnv(k)] = newVals
}
check.Header = header
}
check.Header = interpolateMapStringSliceString(taskEnv, check.Header)
}
service.Name = taskEnv.ReplaceEnv(service.Name)
service.PortLabel = taskEnv.ReplaceEnv(service.PortLabel)
service.Tags = taskEnv.ParseAndReplace(service.Tags)
service.CanaryTags = taskEnv.ParseAndReplace(service.CanaryTags)
if len(service.Meta) > 0 {
meta := make(map[string]string, len(service.Meta))
for k, v := range service.Meta {
meta[k] = taskEnv.ReplaceEnv(v)
}
service.Meta = meta
}
if len(service.CanaryMeta) > 0 {
canaryMeta := make(map[string]string, len(service.CanaryMeta))
for k, v := range service.CanaryMeta {
canaryMeta[k] = taskEnv.ReplaceEnv(v)
}
service.CanaryMeta = canaryMeta
}
service.Meta = interpolateMapStringString(taskEnv, service.Meta)
service.CanaryMeta = interpolateMapStringString(taskEnv, service.CanaryMeta)
interpolateConnect(taskEnv, service.Connect)
interpolated[i] = service
}
return interpolated
}
func interpolateMapStringSliceString(taskEnv *TaskEnv, orig map[string][]string) map[string][]string {
if len(orig) == 0 {
return nil
}
m := make(map[string][]string, len(orig))
for k, vs := range orig {
m[taskEnv.ReplaceEnv(k)] = taskEnv.ParseAndReplace(vs)
}
return m
}
func interpolateMapStringString(taskEnv *TaskEnv, orig map[string]string) map[string]string {
if len(orig) == 0 {
return nil
}
m := make(map[string]string, len(orig))
for k, v := range orig {
m[taskEnv.ReplaceEnv(k)] = taskEnv.ReplaceEnv(v)
}
return m
}
func interpolateMapStringInterface(taskEnv *TaskEnv, orig map[string]interface{}) map[string]interface{} {
if len(orig) == 0 {
return nil
}
m := make(map[string]interface{}, len(orig))
for k, v := range orig {
m[taskEnv.ReplaceEnv(k)] = v
}
return m
}
func interpolateConnect(taskEnv *TaskEnv, connect *structs.ConsulConnect) {
if connect == nil {
return
}
interpolateConnectSidecarService(taskEnv, connect.SidecarService)
interpolateConnectSidecarTask(taskEnv, connect.SidecarTask)
if connect.Gateway != nil {
interpolateConnectGatewayProxy(taskEnv, connect.Gateway.Proxy)
interpolateConnectGatewayIngress(taskEnv, connect.Gateway.Ingress)
}
}
func interpolateConnectGatewayProxy(taskEnv *TaskEnv, proxy *structs.ConsulGatewayProxy) {
if proxy == nil {
return
}
m := make(map[string]*structs.ConsulGatewayBindAddress, len(proxy.EnvoyGatewayBindAddresses))
for k, v := range proxy.EnvoyGatewayBindAddresses {
m[taskEnv.ReplaceEnv(k)] = &structs.ConsulGatewayBindAddress{
Address: taskEnv.ReplaceEnv(v.Address),
Port: v.Port,
}
}
proxy.EnvoyGatewayBindAddresses = m
proxy.Config = interpolateMapStringInterface(taskEnv, proxy.Config)
}
func interpolateConnectGatewayIngress(taskEnv *TaskEnv, ingress *structs.ConsulIngressConfigEntry) {
if ingress == nil {
return
}
for _, listener := range ingress.Listeners {
listener.Protocol = taskEnv.ReplaceEnv(listener.Protocol)
for _, service := range listener.Services {
service.Name = taskEnv.ReplaceEnv(service.Name)
service.Hosts = taskEnv.ParseAndReplace(service.Hosts)
}
}
}
func interpolateConnectSidecarService(taskEnv *TaskEnv, sidecar *structs.ConsulSidecarService) {
if sidecar == nil {
return
}
sidecar.Port = taskEnv.ReplaceEnv(sidecar.Port)
sidecar.Tags = taskEnv.ParseAndReplace(sidecar.Tags)
if sidecar.Proxy != nil {
sidecar.Proxy.LocalServiceAddress = taskEnv.ReplaceEnv(sidecar.Proxy.LocalServiceAddress)
if sidecar.Proxy.Expose != nil {
for i := 0; i < len(sidecar.Proxy.Expose.Paths); i++ {
sidecar.Proxy.Expose.Paths[i].Protocol = taskEnv.ReplaceEnv(sidecar.Proxy.Expose.Paths[i].Protocol)
sidecar.Proxy.Expose.Paths[i].ListenerPort = taskEnv.ReplaceEnv(sidecar.Proxy.Expose.Paths[i].ListenerPort)
sidecar.Proxy.Expose.Paths[i].Path = taskEnv.ReplaceEnv(sidecar.Proxy.Expose.Paths[i].Path)
}
}
for i := 0; i < len(sidecar.Proxy.Upstreams); i++ {
sidecar.Proxy.Upstreams[i].Datacenter = taskEnv.ReplaceEnv(sidecar.Proxy.Upstreams[i].Datacenter)
sidecar.Proxy.Upstreams[i].DestinationName = taskEnv.ReplaceEnv(sidecar.Proxy.Upstreams[i].DestinationName)
}
sidecar.Proxy.Config = interpolateMapStringInterface(taskEnv, sidecar.Proxy.Config)
}
}
func interpolateConnectSidecarTask(taskEnv *TaskEnv, task *structs.SidecarTask) {
if task == nil {
return
}
task.Driver = taskEnv.ReplaceEnv(task.Driver)
task.Config = interpolateMapStringInterface(taskEnv, task.Config)
task.Env = interpolateMapStringString(taskEnv, task.Env)
task.KillSignal = taskEnv.ReplaceEnv(task.KillSignal)
task.Meta = interpolateMapStringString(taskEnv, task.Meta)
interpolateTaskResources(taskEnv, task.Resources)
task.User = taskEnv.ReplaceEnv(task.User)
}
func interpolateTaskResources(taskEnv *TaskEnv, resources *structs.Resources) {
if resources == nil {
return
}
for i := 0; i < len(resources.Devices); i++ {
resources.Devices[i].Name = taskEnv.ReplaceEnv(resources.Devices[i].Name)
// do not interpolate constraints & affinities
}
for i := 0; i < len(resources.Networks); i++ {
resources.Networks[i].CIDR = taskEnv.ReplaceEnv(resources.Networks[i].CIDR)
resources.Networks[i].Device = taskEnv.ReplaceEnv(resources.Networks[i].Device)
resources.Networks[i].IP = taskEnv.ReplaceEnv(resources.Networks[i].IP)
resources.Networks[i].Mode = taskEnv.ReplaceEnv(resources.Networks[i].Mode)
if resources.Networks[i].DNS != nil {
resources.Networks[i].DNS.Options = taskEnv.ParseAndReplace(resources.Networks[i].DNS.Options)
resources.Networks[i].DNS.Searches = taskEnv.ParseAndReplace(resources.Networks[i].DNS.Searches)
resources.Networks[i].DNS.Servers = taskEnv.ParseAndReplace(resources.Networks[i].DNS.Servers)
}
for p := 0; p < len(resources.Networks[i].DynamicPorts); p++ {
resources.Networks[i].DynamicPorts[p].HostNetwork = taskEnv.ReplaceEnv(resources.Networks[i].DynamicPorts[p].HostNetwork)
resources.Networks[i].DynamicPorts[p].Label = taskEnv.ReplaceEnv(resources.Networks[i].DynamicPorts[p].Label)
}
for p := 0; p < len(resources.Networks[i].ReservedPorts); p++ {
resources.Networks[i].ReservedPorts[p].HostNetwork = taskEnv.ReplaceEnv(resources.Networks[i].ReservedPorts[p].HostNetwork)
resources.Networks[i].ReservedPorts[p].Label = taskEnv.ReplaceEnv(resources.Networks[i].ReservedPorts[p].Label)
}
}
}

View File

@@ -2,7 +2,9 @@ package taskenv
import (
"testing"
"time"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/stretchr/testify/require"
)
@@ -11,6 +13,7 @@ import (
// and check fields are properly interpolated.
func TestInterpolateServices(t *testing.T) {
t.Parallel()
services := []*structs.Service{
{
Name: "${name}",
@@ -97,3 +100,317 @@ func TestInterpolateServices(t *testing.T) {
require.Equal(t, exp, interpolated)
}
var testEnv = NewTaskEnv(
map[string]string{"foo": "bar", "baz": "blah"},
nil,
nil,
)
func TestInterpolate_interpolateMapStringSliceString(t *testing.T) {
t.Parallel()
t.Run("nil", func(t *testing.T) {
require.Nil(t, interpolateMapStringSliceString(testEnv, nil))
})
t.Run("not nil", func(t *testing.T) {
require.Equal(t, map[string][]string{
"a": {"b"},
"bar": {"blah", "c"},
}, interpolateMapStringSliceString(testEnv, map[string][]string{
"a": {"b"},
"${foo}": {"${baz}", "c"},
}))
})
}
func TestInterpolate_interpolateMapStringString(t *testing.T) {
t.Parallel()
t.Run("nil", func(t *testing.T) {
require.Nil(t, interpolateMapStringString(testEnv, nil))
})
t.Run("not nil", func(t *testing.T) {
require.Equal(t, map[string]string{
"a": "b",
"bar": "blah",
}, interpolateMapStringString(testEnv, map[string]string{
"a": "b",
"${foo}": "${baz}",
}))
})
}
func TestInterpolate_interpolateMapStringInterface(t *testing.T) {
t.Parallel()
t.Run("nil", func(t *testing.T) {
require.Nil(t, interpolateMapStringInterface(testEnv, nil))
})
t.Run("not nil", func(t *testing.T) {
require.Equal(t, map[string]interface{}{
"a": 1,
"bar": 2,
}, interpolateMapStringInterface(testEnv, map[string]interface{}{
"a": 1,
"${foo}": 2,
}))
})
}
func TestInterpolate_interpolateConnect(t *testing.T) {
t.Parallel()
env := NewTaskEnv(map[string]string{
"tag1": "_tag1",
"port1": "12345",
"address1": "1.2.3.4",
"destination1": "_dest1",
"datacenter1": "_datacenter1",
"path1": "_path1",
"protocol1": "_protocol1",
"port2": "_port2",
"config1": "_config1",
"driver1": "_driver1",
"user1": "_user1",
"config2": "_config2",
"env1": "_env1",
"env2": "_env2",
"mode1": "_mode1",
"device1": "_device1",
"cidr1": "10.0.0.0/64",
"ip1": "1.1.1.1",
"server1": "10.0.0.1",
"search1": "10.0.0.2",
"option1": "10.0.0.3",
"port3": "_port3",
"network1": "_network1",
"port4": "_port4",
"network2": "_network2",
"resource1": "_resource1",
"meta1": "_meta1",
"meta2": "_meta2",
"signal1": "_signal1",
"bind1": "_bind1",
"address2": "10.0.0.4",
"config3": "_config3",
"protocol2": "_protocol2",
"service1": "_service1",
"host1": "_host1",
}, nil, nil)
connect := &structs.ConsulConnect{
Native: false,
SidecarService: &structs.ConsulSidecarService{
Tags: []string{"${tag1}", "tag2"},
Port: "${port1}",
Proxy: &structs.ConsulProxy{
LocalServiceAddress: "${address1}",
LocalServicePort: 10000,
Upstreams: []structs.ConsulUpstream{{
DestinationName: "${destination1}",
Datacenter: "${datacenter1}",
LocalBindPort: 10001,
}},
Expose: &structs.ConsulExposeConfig{
Paths: []structs.ConsulExposePath{{
Path: "${path1}",
Protocol: "${protocol1}",
ListenerPort: "${port2}",
LocalPathPort: 10002,
}},
},
Config: map[string]interface{}{
"${config1}": 1,
},
},
},
SidecarTask: &structs.SidecarTask{
Name: "name", // not interpolated by taskenv
Driver: "${driver1}",
User: "${user1}",
Config: map[string]interface{}{"${config2}": 2},
Env: map[string]string{"${env1}": "${env2}"},
Resources: &structs.Resources{
CPU: 1,
MemoryMB: 2,
DiskMB: 3,
IOPS: 4,
Networks: structs.Networks{{
Mode: "${mode1}",
Device: "${device1}",
CIDR: "${cidr1}",
IP: "${ip1}",
MBits: 1,
DNS: &structs.DNSConfig{
Servers: []string{"${server1}"},
Searches: []string{"${search1}"},
Options: []string{"${option1}"},
},
ReservedPorts: []structs.Port{{
Label: "${port3}",
Value: 9000,
To: 9000,
HostNetwork: "${network1}",
}},
DynamicPorts: []structs.Port{{
Label: "${port4}",
Value: 9001,
To: 9001,
HostNetwork: "${network2}",
}},
}},
Devices: structs.ResourceDevices{{
Name: "${resource1}",
}},
},
Meta: map[string]string{"${meta1}": "${meta2}"},
KillTimeout: helper.TimeToPtr(1 * time.Second),
LogConfig: &structs.LogConfig{
MaxFiles: 1,
MaxFileSizeMB: 2,
},
ShutdownDelay: helper.TimeToPtr(2 * time.Second),
KillSignal: "${signal1}",
},
Gateway: &structs.ConsulGateway{
Proxy: &structs.ConsulGatewayProxy{
ConnectTimeout: helper.TimeToPtr(3 * time.Second),
EnvoyGatewayBindTaggedAddresses: true,
EnvoyGatewayBindAddresses: map[string]*structs.ConsulGatewayBindAddress{
"${bind1}": {
Address: "${address2}",
Port: 8000,
},
},
EnvoyGatewayNoDefaultBind: true,
Config: map[string]interface{}{
"${config3}": 4,
},
},
Ingress: &structs.ConsulIngressConfigEntry{
TLS: &structs.ConsulGatewayTLSConfig{
Enabled: true,
},
Listeners: []*structs.ConsulIngressListener{{
Protocol: "${protocol2}",
Port: 8001,
Services: []*structs.ConsulIngressService{{
Name: "${service1}",
Hosts: []string{"${host1}", "host2"},
}},
}},
},
},
}
interpolateConnect(env, connect)
require.Equal(t, &structs.ConsulConnect{
Native: false,
SidecarService: &structs.ConsulSidecarService{
Tags: []string{"_tag1", "tag2"},
Port: "12345",
Proxy: &structs.ConsulProxy{
LocalServiceAddress: "1.2.3.4",
LocalServicePort: 10000,
Upstreams: []structs.ConsulUpstream{{
DestinationName: "_dest1",
Datacenter: "_datacenter1",
LocalBindPort: 10001,
}},
Expose: &structs.ConsulExposeConfig{
Paths: []structs.ConsulExposePath{{
Path: "_path1",
Protocol: "_protocol1",
ListenerPort: "_port2",
LocalPathPort: 10002,
}},
},
Config: map[string]interface{}{
"_config1": 1,
},
},
},
SidecarTask: &structs.SidecarTask{
Name: "name", // not interpolated by InterpolateServices
Driver: "_driver1",
User: "_user1",
Config: map[string]interface{}{"_config2": 2},
Env: map[string]string{"_env1": "_env2"},
Resources: &structs.Resources{
CPU: 1,
MemoryMB: 2,
DiskMB: 3,
IOPS: 4,
Networks: structs.Networks{{
Mode: "_mode1",
Device: "_device1",
CIDR: "10.0.0.0/64",
IP: "1.1.1.1",
MBits: 1,
DNS: &structs.DNSConfig{
Servers: []string{"10.0.0.1"},
Searches: []string{"10.0.0.2"},
Options: []string{"10.0.0.3"},
},
ReservedPorts: []structs.Port{{
Label: "_port3",
Value: 9000,
To: 9000,
HostNetwork: "_network1",
}},
DynamicPorts: []structs.Port{{
Label: "_port4",
Value: 9001,
To: 9001,
HostNetwork: "_network2",
}},
}},
Devices: structs.ResourceDevices{{
Name: "_resource1",
}},
},
Meta: map[string]string{"_meta1": "_meta2"},
KillTimeout: helper.TimeToPtr(1 * time.Second),
LogConfig: &structs.LogConfig{
MaxFiles: 1,
MaxFileSizeMB: 2,
},
ShutdownDelay: helper.TimeToPtr(2 * time.Second),
KillSignal: "_signal1",
},
Gateway: &structs.ConsulGateway{
Proxy: &structs.ConsulGatewayProxy{
ConnectTimeout: helper.TimeToPtr(3 * time.Second),
EnvoyGatewayBindTaggedAddresses: true,
EnvoyGatewayBindAddresses: map[string]*structs.ConsulGatewayBindAddress{
"_bind1": {
Address: "10.0.0.4",
Port: 8000,
},
},
EnvoyGatewayNoDefaultBind: true,
Config: map[string]interface{}{
"_config3": 4,
},
},
Ingress: &structs.ConsulIngressConfigEntry{
TLS: &structs.ConsulGatewayTLSConfig{
Enabled: true,
},
Listeners: []*structs.ConsulIngressListener{{
Protocol: "_protocol2",
Port: 8001,
Services: []*structs.ConsulIngressService{{
Name: "_service1",
Hosts: []string{"_host1", "host2"},
}},
}},
},
},
}, connect)
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"time"
"github.com/hashicorp/nomad/client/taskenv"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/structs"
@@ -173,11 +174,25 @@ func getNamedTaskForNativeService(tg *structs.TaskGroup, serviceName, taskName s
// probably need to hack this up to look for checks on the service, and if they
// qualify, configure a port for envoy to use to expose their paths.
func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error {
// Create an environment interpolator with what we have at submission time.
// This should only be used to interpolate connect service names which are
// used in sidecar or gateway task names. Note that the service name might
// also be interpolated with job specifics during service canonicalization.
env := taskenv.NewEmptyBuilder().UpdateTask(&structs.Allocation{
Job: job,
TaskGroup: g.Name,
}, nil).Build()
for _, service := range g.Services {
switch {
// mutate depending on what the connect block is being used for
case service.Connect.HasSidecar():
// interpolate the connect service name, which is used to create
// a name of an injected sidecar task
service.Name = env.ReplaceEnv(service.Name)
// Check to see if the sidecar task already exists
task := getSidecarTaskForService(g, service.Name)
@@ -233,6 +248,10 @@ func groupConnectHook(job *structs.Job, g *structs.TaskGroup) error {
}
case service.Connect.IsGateway():
// interpolate the connect service name, which is used to create
// a name of an injected gateway task
service.Name = env.ReplaceEnv(service.Name)
netHost := g.Networks[0].Mode == "host"
if !netHost && service.Connect.Gateway.Ingress != nil {
// Modify the gateway proxy service configuration to automatically

View File

@@ -58,58 +58,58 @@ func TestJobEndpointConnect_groupConnectHook(t *testing.T) {
// Test that connect-proxy task is inserted for backend service
job := mock.Job()
job.Meta = map[string]string{
"backend_name": "backend",
"admin_name": "admin",
}
job.TaskGroups[0] = &structs.TaskGroup{
Networks: structs.Networks{
{
Mode: "bridge",
Networks: structs.Networks{{
Mode: "bridge",
}},
Services: []*structs.Service{{
Name: "${NOMAD_META_backend_name}",
PortLabel: "8080",
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{},
},
},
Services: []*structs.Service{
{
Name: "backend",
PortLabel: "8080",
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{},
},
}, {
Name: "${NOMAD_META_admin_name}",
PortLabel: "9090",
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{},
},
{
Name: "admin",
PortLabel: "9090",
Connect: &structs.ConsulConnect{
SidecarService: &structs.ConsulSidecarService{},
},
},
},
}},
}
// Expected tasks
tgOut := job.TaskGroups[0].Copy()
tgOut.Tasks = []*structs.Task{
newConnectTask(tgOut.Services[0].Name),
newConnectTask(tgOut.Services[1].Name),
tgExp := job.TaskGroups[0].Copy()
tgExp.Tasks = []*structs.Task{
newConnectTask("backend"),
newConnectTask("admin"),
}
tgExp.Services[0].Name = "backend"
tgExp.Services[1].Name = "admin"
// Expect sidecar tasks to be properly canonicalized
tgOut.Tasks[0].Canonicalize(job, tgOut)
tgOut.Tasks[1].Canonicalize(job, tgOut)
tgOut.Networks[0].DynamicPorts = []structs.Port{
{
Label: fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, "backend"),
To: -1,
},
{
Label: fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, "admin"),
To: -1,
},
}
tgOut.Networks[0].Canonicalize()
tgExp.Tasks[0].Canonicalize(job, tgExp)
tgExp.Tasks[1].Canonicalize(job, tgExp)
tgExp.Networks[0].DynamicPorts = []structs.Port{{
Label: fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, "backend"),
To: -1,
}, {
Label: fmt.Sprintf("%s-%s", structs.ConnectProxyPrefix, "admin"),
To: -1,
}}
tgExp.Networks[0].Canonicalize()
require.NoError(t, groupConnectHook(job, job.TaskGroups[0]))
require.Exactly(t, tgOut, job.TaskGroups[0])
require.Exactly(t, tgExp, job.TaskGroups[0])
// Test that hook is idempotent
require.NoError(t, groupConnectHook(job, job.TaskGroups[0]))
require.Exactly(t, tgOut, job.TaskGroups[0])
require.Exactly(t, tgExp, job.TaskGroups[0])
}
func TestJobEndpointConnect_groupConnectHook_IngressGateway(t *testing.T) {
@@ -120,11 +120,18 @@ func TestJobEndpointConnect_groupConnectHook_IngressGateway(t *testing.T) {
// block with correct configuration.
job := mock.ConnectIngressGatewayJob("bridge", false)
job.Meta = map[string]string{
"gateway_name": "my-gateway",
}
job.TaskGroups[0].Services[0].Name = "${NOMAD_META_gateway_name}"
expTG := job.TaskGroups[0].Copy()
expTG.Tasks = []*structs.Task{
// inject the gateway task
newConnectGatewayTask(expTG.Services[0].Name, false),
newConnectGatewayTask("my-gateway", false),
}
expTG.Services[0].Name = "my-gateway"
expTG.Tasks[0].Canonicalize(job, expTG)
expTG.Networks[0].Canonicalize()

View File

@@ -1212,6 +1212,7 @@ func TestJobEndpoint_Register_Dispatched(t *testing.T) {
// Create the register request with a job with 'Dispatch' set to true
job := mock.Job()
job.Dispatched = true
job.ParameterizedJob = &structs.ParameterizedJobConfig{}
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{