Add network stanza to group

Adds a network stanza and additional options to the task group level
in prep for allowing shared networking between tasks of an alloc.
This commit is contained in:
Nick Ethier
2019-04-29 13:15:12 -04:00
parent 756177c70c
commit b60bc8c17d
7 changed files with 202 additions and 63 deletions

View File

@@ -86,11 +86,13 @@ func (r *Resources) Merge(other *Resources) {
type Port struct {
Label string
Value int `mapstructure:"static"`
To int `mapstructure:"to"`
}
// NetworkResource is used to describe required network
// resources of a given task.
type NetworkResource struct {
Mode string
Device string
CIDR string
IP string

View File

@@ -493,6 +493,7 @@ type TaskGroup struct {
EphemeralDisk *EphemeralDisk
Update *UpdateStrategy
Migrate *MigrateStrategy
Networks []*NetworkResource
Meta map[string]string
}
@@ -604,6 +605,9 @@ func (g *TaskGroup) Canonicalize(job *Job) {
for _, a := range g.Affinities {
a.Canonicalize()
}
for _, n := range g.Networks {
n.Canonicalize()
}
}
// Constrain is used to add a constraint to a task group.

View File

@@ -685,6 +685,7 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) {
tg.Meta = taskGroup.Meta
tg.Constraints = ApiConstraintsToStructs(taskGroup.Constraints)
tg.Affinities = ApiAffinitiesToStructs(taskGroup.Affinities)
tg.Networks = ApiNetworkResourceToStructs(taskGroup.Networks)
tg.RestartPolicy = &structs.RestartPolicy{
Attempts: *taskGroup.RestartPolicy.Attempts,
@@ -886,35 +887,8 @@ func ApiResourcesToStructs(in *api.Resources) *structs.Resources {
out.IOPS = *in.IOPS
}
if l := len(in.Networks); l != 0 {
out.Networks = make([]*structs.NetworkResource, l)
for i, nw := range in.Networks {
out.Networks[i] = &structs.NetworkResource{
CIDR: nw.CIDR,
IP: nw.IP,
MBits: *nw.MBits,
}
if l := len(nw.DynamicPorts); l != 0 {
out.Networks[i].DynamicPorts = make([]structs.Port, l)
for j, dp := range nw.DynamicPorts {
out.Networks[i].DynamicPorts[j] = structs.Port{
Label: dp.Label,
Value: dp.Value,
}
}
}
if l := len(nw.ReservedPorts); l != 0 {
out.Networks[i].ReservedPorts = make([]structs.Port, l)
for j, rp := range nw.ReservedPorts {
out.Networks[i].ReservedPorts[j] = structs.Port{
Label: rp.Label,
Value: rp.Value,
}
}
}
}
if len(in.Networks) != 0 {
out.Networks = ApiNetworkResourceToStructs(in.Networks)
}
if l := len(in.Devices); l != 0 {
@@ -932,6 +906,43 @@ func ApiResourcesToStructs(in *api.Resources) *structs.Resources {
return out
}
func ApiNetworkResourceToStructs(in []*api.NetworkResource) []*structs.NetworkResource {
var out []*structs.NetworkResource
if l := len(in); l != 0 {
out = make([]*structs.NetworkResource, l)
for i, nw := range in {
out[i] = &structs.NetworkResource{
Mode: nw.Mode,
CIDR: nw.CIDR,
IP: nw.IP,
MBits: *nw.MBits,
}
if l := len(nw.DynamicPorts); l != 0 {
out[i].DynamicPorts = make([]structs.Port, l)
for j, dp := range nw.DynamicPorts {
out[i].DynamicPorts[j] = structs.Port{
Label: dp.Label,
Value: dp.Value,
}
}
}
if l := len(nw.ReservedPorts); l != 0 {
out[i].ReservedPorts = make([]structs.Port, l)
for j, rp := range nw.ReservedPorts {
out[i].ReservedPorts[j] = structs.Port{
Label: rp.Label,
Value: rp.Value,
}
}
}
}
}
return out
}
func ApiConstraintsToStructs(in []*api.Constraint) []*structs.Constraint {
if in == nil {
return nil

View File

@@ -314,6 +314,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
"vault",
"migrate",
"spread",
"network",
}
if err := helper.CheckHCLKeys(listVal, valid); err != nil {
return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n))
@@ -333,6 +334,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
delete(m, "vault")
delete(m, "migrate")
delete(m, "spread")
delete(m, "network")
// Build the group with the basic decode
var g api.TaskGroup
@@ -369,6 +371,15 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error {
}
}
// Parse network
if o := listVal.Filter("network"); len(o.Items) > 0 {
networks, err := parseNetwork(o)
if err != nil {
return err
}
g.Networks = []*api.NetworkResource{networks}
}
// Parse reschedule policy
if o := listVal.Filter("reschedule"); len(o.Items) > 0 {
if err := parseReschedulePolicy(&g.ReschedulePolicy, o); err != nil {
@@ -1433,39 +1444,11 @@ func parseResources(result *api.Resources, list *ast.ObjectList) error {
// Parse the network resources
if o := listVal.Filter("network"); len(o.Items) > 0 {
if len(o.Items) > 1 {
return fmt.Errorf("only one 'network' resource allowed")
r, err := parseNetwork(o)
if err != nil {
return fmt.Errorf("resource, %v", err)
}
// Check for invalid keys
valid := []string{
"mbits",
"port",
}
if err := helper.CheckHCLKeys(o.Items[0].Val, valid); err != nil {
return multierror.Prefix(err, "resources, network ->")
}
var r api.NetworkResource
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Items[0].Val); err != nil {
return err
}
if err := mapstructure.WeakDecode(m, &r); err != nil {
return err
}
var networkObj *ast.ObjectList
if ot, ok := o.Items[0].Val.(*ast.ObjectType); ok {
networkObj = ot.List
} else {
return fmt.Errorf("resource: should be an object")
}
if err := parsePorts(networkObj, &r); err != nil {
return multierror.Prefix(err, "resources, network, ports ->")
}
result.Networks = []*api.NetworkResource{&r}
result.Networks = []*api.NetworkResource{r}
}
// Parse the device resources
@@ -1535,11 +1518,49 @@ func parseResources(result *api.Resources, list *ast.ObjectList) error {
return nil
}
func parseNetwork(o *ast.ObjectList) (*api.NetworkResource, error) {
if len(o.Items) > 1 {
return nil, fmt.Errorf("only one 'network' resource allowed")
}
// Check for invalid keys
valid := []string{
"mode",
"mbits",
"port",
}
if err := helper.CheckHCLKeys(o.Items[0].Val, valid); err != nil {
return nil, multierror.Prefix(err, "network ->")
}
var r api.NetworkResource
var m map[string]interface{}
if err := hcl.DecodeObject(&m, o.Items[0].Val); err != nil {
return nil, err
}
if err := mapstructure.WeakDecode(m, &r); err != nil {
return nil, err
}
var networkObj *ast.ObjectList
if ot, ok := o.Items[0].Val.(*ast.ObjectType); ok {
networkObj = ot.List
} else {
return nil, fmt.Errorf("should be an object")
}
if err := parsePorts(networkObj, &r); err != nil {
return nil, multierror.Prefix(err, "network, ports ->")
}
return &r, nil
}
func parsePorts(networkObj *ast.ObjectList, nw *api.NetworkResource) error {
// Check for invalid keys
valid := []string{
"mbits",
"port",
"mode",
}
if err := helper.CheckHCLKeys(networkObj, valid); err != nil {
return err

View File

@@ -871,6 +871,50 @@ func TestParse(t *testing.T) {
},
false,
},
{
"tg-network.hcl",
&api.Job{
ID: helper.StringToPtr("foo"),
Name: helper.StringToPtr("foo"),
Datacenters: []string{"dc1"},
TaskGroups: []*api.TaskGroup{
{
Name: helper.StringToPtr("bar"),
Count: helper.IntToPtr(3),
Networks: []*api.NetworkResource{
{
Mode: "bridge",
ReservedPorts: []api.Port{
{
Label: "http",
Value: 80,
To: 8080,
},
},
},
},
Tasks: []*api.Task{
{
Name: "bar",
Driver: "raw_exec",
Config: map[string]interface{}{
"command": "bash",
"args": []interface{}{"-c", "echo hi"},
},
Resources: &api.Resources{
Networks: []*api.NetworkResource{
{
MBits: helper.IntToPtr(10),
},
},
},
},
},
},
},
},
false,
},
}
for _, tc := range cases {

View File

@@ -0,0 +1,25 @@
job "foo" {
datacenters = ["dc1"]
group "bar" {
count = 3
network {
mode = "bridge"
port "http" {
static = 80
to = 8080
}
}
task "bar" {
driver = "raw_exec"
config {
command = "bash"
args = ["-c", "echo hi"]
}
resources {
network {
mbits = 10
}
}
}
}
}

View File

@@ -2011,11 +2011,13 @@ func (r *Resources) GoString() string {
type Port struct {
Label string
Value int
To int
}
// NetworkResource is used to represent available network
// resources
type NetworkResource struct {
Mode string // Mode of the network
Device string // Name of the device
CIDR string // CIDR block of addresses
IP string // Host IP address
@@ -2025,6 +2027,10 @@ type NetworkResource struct {
}
func (nr *NetworkResource) Equals(other *NetworkResource) bool {
if nr.Mode != other.Mode {
return false
}
if nr.Device != other.Device {
return false
}
@@ -2970,15 +2976,17 @@ func (a *AllocatedTaskResources) Subtract(delta *AllocatedTaskResources) {
// AllocatedSharedResources are the set of resources allocated to a task group.
type AllocatedSharedResources struct {
DiskMB int64
Networks Networks
DiskMB int64
}
func (a *AllocatedSharedResources) Add(delta *AllocatedSharedResources) {
if delta == nil {
return
}
a.Networks = append(a.Networks, delta.Networks...)
a.DiskMB += delta.DiskMB
}
func (a *AllocatedSharedResources) Subtract(delta *AllocatedSharedResources) {
@@ -2986,6 +2994,17 @@ func (a *AllocatedSharedResources) Subtract(delta *AllocatedSharedResources) {
return
}
diff := map[*NetworkResource]bool{}
for _, n := range delta.Networks {
diff[n] = true
}
var nets Networks
for _, n := range a.Networks {
if _, ok := diff[n]; !ok {
nets = append(nets, n)
}
}
a.Networks = nets
a.DiskMB -= delta.DiskMB
}
@@ -4623,6 +4642,10 @@ type TaskGroup struct {
// Spread can be specified at the task group level to express spreading
// allocations across a desired attribute, such as datacenter
Spreads []*Spread
// Networks are the network configuration for the task group. This can be
// overriden in the task.
Networks Networks
}
func (tg *TaskGroup) Copy() *TaskGroup {
@@ -4638,6 +4661,15 @@ func (tg *TaskGroup) Copy() *TaskGroup {
ntg.Affinities = CopySliceAffinities(ntg.Affinities)
ntg.Spreads = CopySliceSpreads(ntg.Spreads)
// Copy the network objects
if tg.Networks != nil {
n := len(tg.Networks)
ntg.Networks = make([]*NetworkResource, n)
for i := 0; i < n; i++ {
ntg.Networks[i] = tg.Networks[i].Copy()
}
}
if tg.Tasks != nil {
tasks := make([]*Task, len(ntg.Tasks))
for i, t := range ntg.Tasks {