diff --git a/api/resources.go b/api/resources.go index c0c0a4fc7..610ea8a8e 100644 --- a/api/resources.go +++ b/api/resources.go @@ -86,11 +86,13 @@ func (r *Resources) Merge(other *Resources) { type Port struct { Label string Value int `mapstructure:"static"` + To int `mapstructure:"to"` } // NetworkResource is used to describe required network // resources of a given task. type NetworkResource struct { + Mode string Device string CIDR string IP string diff --git a/api/tasks.go b/api/tasks.go index 693287673..e26c04015 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -493,6 +493,7 @@ type TaskGroup struct { EphemeralDisk *EphemeralDisk Update *UpdateStrategy Migrate *MigrateStrategy + Networks []*NetworkResource Meta map[string]string } @@ -604,6 +605,9 @@ func (g *TaskGroup) Canonicalize(job *Job) { for _, a := range g.Affinities { a.Canonicalize() } + for _, n := range g.Networks { + n.Canonicalize() + } } // Constrain is used to add a constraint to a task group. diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 1279a9ce9..ceb14fb5d 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -685,6 +685,7 @@ func ApiTgToStructsTG(taskGroup *api.TaskGroup, tg *structs.TaskGroup) { tg.Meta = taskGroup.Meta tg.Constraints = ApiConstraintsToStructs(taskGroup.Constraints) tg.Affinities = ApiAffinitiesToStructs(taskGroup.Affinities) + tg.Networks = ApiNetworkResourceToStructs(taskGroup.Networks) tg.RestartPolicy = &structs.RestartPolicy{ Attempts: *taskGroup.RestartPolicy.Attempts, @@ -886,35 +887,8 @@ func ApiResourcesToStructs(in *api.Resources) *structs.Resources { out.IOPS = *in.IOPS } - if l := len(in.Networks); l != 0 { - out.Networks = make([]*structs.NetworkResource, l) - for i, nw := range in.Networks { - out.Networks[i] = &structs.NetworkResource{ - CIDR: nw.CIDR, - IP: nw.IP, - MBits: *nw.MBits, - } - - if l := len(nw.DynamicPorts); l != 0 { - out.Networks[i].DynamicPorts = make([]structs.Port, l) - for j, dp := range nw.DynamicPorts { - out.Networks[i].DynamicPorts[j] = structs.Port{ - Label: dp.Label, - Value: dp.Value, - } - } - } - - if l := len(nw.ReservedPorts); l != 0 { - out.Networks[i].ReservedPorts = make([]structs.Port, l) - for j, rp := range nw.ReservedPorts { - out.Networks[i].ReservedPorts[j] = structs.Port{ - Label: rp.Label, - Value: rp.Value, - } - } - } - } + if len(in.Networks) != 0 { + out.Networks = ApiNetworkResourceToStructs(in.Networks) } if l := len(in.Devices); l != 0 { @@ -932,6 +906,43 @@ func ApiResourcesToStructs(in *api.Resources) *structs.Resources { return out } +func ApiNetworkResourceToStructs(in []*api.NetworkResource) []*structs.NetworkResource { + var out []*structs.NetworkResource + if l := len(in); l != 0 { + out = make([]*structs.NetworkResource, l) + for i, nw := range in { + out[i] = &structs.NetworkResource{ + Mode: nw.Mode, + CIDR: nw.CIDR, + IP: nw.IP, + MBits: *nw.MBits, + } + + if l := len(nw.DynamicPorts); l != 0 { + out[i].DynamicPorts = make([]structs.Port, l) + for j, dp := range nw.DynamicPorts { + out[i].DynamicPorts[j] = structs.Port{ + Label: dp.Label, + Value: dp.Value, + } + } + } + + if l := len(nw.ReservedPorts); l != 0 { + out[i].ReservedPorts = make([]structs.Port, l) + for j, rp := range nw.ReservedPorts { + out[i].ReservedPorts[j] = structs.Port{ + Label: rp.Label, + Value: rp.Value, + } + } + } + } + } + + return out +} + func ApiConstraintsToStructs(in []*api.Constraint) []*structs.Constraint { if in == nil { return nil diff --git a/jobspec/parse.go b/jobspec/parse.go index d881866c1..ca61bad84 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -314,6 +314,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { "vault", "migrate", "spread", + "network", } if err := helper.CheckHCLKeys(listVal, valid); err != nil { return multierror.Prefix(err, fmt.Sprintf("'%s' ->", n)) @@ -333,6 +334,7 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { delete(m, "vault") delete(m, "migrate") delete(m, "spread") + delete(m, "network") // Build the group with the basic decode var g api.TaskGroup @@ -369,6 +371,15 @@ func parseGroups(result *api.Job, list *ast.ObjectList) error { } } + // Parse network + if o := listVal.Filter("network"); len(o.Items) > 0 { + networks, err := parseNetwork(o) + if err != nil { + return err + } + g.Networks = []*api.NetworkResource{networks} + } + // Parse reschedule policy if o := listVal.Filter("reschedule"); len(o.Items) > 0 { if err := parseReschedulePolicy(&g.ReschedulePolicy, o); err != nil { @@ -1433,39 +1444,11 @@ func parseResources(result *api.Resources, list *ast.ObjectList) error { // Parse the network resources if o := listVal.Filter("network"); len(o.Items) > 0 { - if len(o.Items) > 1 { - return fmt.Errorf("only one 'network' resource allowed") + r, err := parseNetwork(o) + if err != nil { + return fmt.Errorf("resource, %v", err) } - - // Check for invalid keys - valid := []string{ - "mbits", - "port", - } - if err := helper.CheckHCLKeys(o.Items[0].Val, valid); err != nil { - return multierror.Prefix(err, "resources, network ->") - } - - var r api.NetworkResource - var m map[string]interface{} - if err := hcl.DecodeObject(&m, o.Items[0].Val); err != nil { - return err - } - if err := mapstructure.WeakDecode(m, &r); err != nil { - return err - } - - var networkObj *ast.ObjectList - if ot, ok := o.Items[0].Val.(*ast.ObjectType); ok { - networkObj = ot.List - } else { - return fmt.Errorf("resource: should be an object") - } - if err := parsePorts(networkObj, &r); err != nil { - return multierror.Prefix(err, "resources, network, ports ->") - } - - result.Networks = []*api.NetworkResource{&r} + result.Networks = []*api.NetworkResource{r} } // Parse the device resources @@ -1535,11 +1518,49 @@ func parseResources(result *api.Resources, list *ast.ObjectList) error { return nil } +func parseNetwork(o *ast.ObjectList) (*api.NetworkResource, error) { + if len(o.Items) > 1 { + return nil, fmt.Errorf("only one 'network' resource allowed") + } + + // Check for invalid keys + valid := []string{ + "mode", + "mbits", + "port", + } + if err := helper.CheckHCLKeys(o.Items[0].Val, valid); err != nil { + return nil, multierror.Prefix(err, "network ->") + } + + var r api.NetworkResource + var m map[string]interface{} + if err := hcl.DecodeObject(&m, o.Items[0].Val); err != nil { + return nil, err + } + if err := mapstructure.WeakDecode(m, &r); err != nil { + return nil, err + } + + var networkObj *ast.ObjectList + if ot, ok := o.Items[0].Val.(*ast.ObjectType); ok { + networkObj = ot.List + } else { + return nil, fmt.Errorf("should be an object") + } + if err := parsePorts(networkObj, &r); err != nil { + return nil, multierror.Prefix(err, "network, ports ->") + } + + return &r, nil +} + func parsePorts(networkObj *ast.ObjectList, nw *api.NetworkResource) error { // Check for invalid keys valid := []string{ "mbits", "port", + "mode", } if err := helper.CheckHCLKeys(networkObj, valid); err != nil { return err diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 6fa295134..a8c36f6a3 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -871,6 +871,50 @@ func TestParse(t *testing.T) { }, false, }, + { + "tg-network.hcl", + &api.Job{ + ID: helper.StringToPtr("foo"), + Name: helper.StringToPtr("foo"), + Datacenters: []string{"dc1"}, + TaskGroups: []*api.TaskGroup{ + { + Name: helper.StringToPtr("bar"), + Count: helper.IntToPtr(3), + Networks: []*api.NetworkResource{ + { + Mode: "bridge", + ReservedPorts: []api.Port{ + { + Label: "http", + Value: 80, + To: 8080, + }, + }, + }, + }, + Tasks: []*api.Task{ + { + Name: "bar", + Driver: "raw_exec", + Config: map[string]interface{}{ + "command": "bash", + "args": []interface{}{"-c", "echo hi"}, + }, + Resources: &api.Resources{ + Networks: []*api.NetworkResource{ + { + MBits: helper.IntToPtr(10), + }, + }, + }, + }, + }, + }, + }, + }, + false, + }, } for _, tc := range cases { diff --git a/jobspec/test-fixtures/tg-network.hcl b/jobspec/test-fixtures/tg-network.hcl new file mode 100644 index 000000000..9f921b44f --- /dev/null +++ b/jobspec/test-fixtures/tg-network.hcl @@ -0,0 +1,25 @@ +job "foo" { + datacenters = ["dc1"] + group "bar" { + count = 3 + network { + mode = "bridge" + port "http" { + static = 80 + to = 8080 + } + } + task "bar" { + driver = "raw_exec" + config { + command = "bash" + args = ["-c", "echo hi"] + } + resources { + network { + mbits = 10 + } + } + } + } +} diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 8abb1a325..f9f0c6b4e 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2011,11 +2011,13 @@ func (r *Resources) GoString() string { type Port struct { Label string Value int + To int } // NetworkResource is used to represent available network // resources type NetworkResource struct { + Mode string // Mode of the network Device string // Name of the device CIDR string // CIDR block of addresses IP string // Host IP address @@ -2025,6 +2027,10 @@ type NetworkResource struct { } func (nr *NetworkResource) Equals(other *NetworkResource) bool { + if nr.Mode != other.Mode { + return false + } + if nr.Device != other.Device { return false } @@ -2970,15 +2976,17 @@ func (a *AllocatedTaskResources) Subtract(delta *AllocatedTaskResources) { // AllocatedSharedResources are the set of resources allocated to a task group. type AllocatedSharedResources struct { - DiskMB int64 + Networks Networks + DiskMB int64 } func (a *AllocatedSharedResources) Add(delta *AllocatedSharedResources) { if delta == nil { return } - + a.Networks = append(a.Networks, delta.Networks...) a.DiskMB += delta.DiskMB + } func (a *AllocatedSharedResources) Subtract(delta *AllocatedSharedResources) { @@ -2986,6 +2994,17 @@ func (a *AllocatedSharedResources) Subtract(delta *AllocatedSharedResources) { return } + diff := map[*NetworkResource]bool{} + for _, n := range delta.Networks { + diff[n] = true + } + var nets Networks + for _, n := range a.Networks { + if _, ok := diff[n]; !ok { + nets = append(nets, n) + } + } + a.Networks = nets a.DiskMB -= delta.DiskMB } @@ -4623,6 +4642,10 @@ type TaskGroup struct { // Spread can be specified at the task group level to express spreading // allocations across a desired attribute, such as datacenter Spreads []*Spread + + // Networks are the network configuration for the task group. This can be + // overriden in the task. + Networks Networks } func (tg *TaskGroup) Copy() *TaskGroup { @@ -4638,6 +4661,15 @@ func (tg *TaskGroup) Copy() *TaskGroup { ntg.Affinities = CopySliceAffinities(ntg.Affinities) ntg.Spreads = CopySliceSpreads(ntg.Spreads) + // Copy the network objects + if tg.Networks != nil { + n := len(tg.Networks) + ntg.Networks = make([]*NetworkResource, n) + for i := 0; i < n; i++ { + ntg.Networks[i] = tg.Networks[i].Copy() + } + } + if tg.Tasks != nil { tasks := make([]*Task, len(ntg.Tasks)) for i, t := range ntg.Tasks {