From 793521d1fe2a5f2931828113e9ae719516fdfe23 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sat, 12 Sep 2015 19:34:46 -0700 Subject: [PATCH] scheduler: adding helper library for network assignments --- nomad/mock/mock.go | 15 ++- scheduler/network.go | 173 ++++++++++++++++++++++++++++++++ scheduler/network_test.go | 203 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 387 insertions(+), 4 deletions(-) create mode 100644 scheduler/network.go create mode 100644 scheduler/network_test.go diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index dc852eef7..310a22812 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -20,10 +20,9 @@ func Node() *structs.Node { IOPS: 150, Networks: []*structs.NetworkResource{ &structs.NetworkResource{ - Device: "eth0", - CIDR: "192.168.0.100/32", - ReservedPorts: []int{22}, - MBits: 1000, + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, }, }, }, @@ -31,6 +30,14 @@ func Node() *structs.Node { CPU: 0.1, MemoryMB: 256, DiskMB: 4 * 1024, + Networks: []*structs.NetworkResource{ + &structs.NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + ReservedPorts: []int{22}, + MBits: 1, + }, + }, }, Links: map[string]string{ "consul": "foobar.dc1", diff --git a/scheduler/network.go b/scheduler/network.go new file mode 100644 index 000000000..2d2f8b4eb --- /dev/null +++ b/scheduler/network.go @@ -0,0 +1,173 @@ +package scheduler + +import ( + "math/rand" + "net" + + "github.com/hashicorp/nomad/nomad/structs" +) + +const ( + // MinDynamicPort is the smallest dynamic port generated + MinDynamicPort = 20000 + + // MaxDynamicPort is the largest dynamic port generated + MaxDynamicPort = 60000 + + // maxRandPortAttempts is the maximum number of attempt + // to assign a random port + maxRandPortAttempts = 20 +) + +// NetworkIndex is used to index the available network resources +// and the used network resources on a machine given allocations +type NetworkIndex struct { + AvailNetworks []*structs.NetworkResource // List of available networks + AvailBandwidth map[string]int // Bandwidth by device + UsedPorts map[string]map[int]struct{} // Ports by IP + UsedBandwidth map[string]int // Bandwidth by device +} + +// NewNetworkIndex is used to construct a new network index +func NewNetworkIndex() *NetworkIndex { + return &NetworkIndex{ + AvailBandwidth: make(map[string]int), + UsedPorts: make(map[string]map[int]struct{}), + UsedBandwidth: make(map[string]int), + } +} + +// SetNode is used to setup the available network resources +func (idx *NetworkIndex) SetNode(node *structs.Node) { + // Add the available CIDR blocks + for _, n := range node.Resources.Networks { + if n.CIDR != "" { + idx.AvailNetworks = append(idx.AvailNetworks, n) + idx.AvailBandwidth[n.Device] = n.MBits + } + } + + // Add the reserved resources + if r := node.Reserved; r != nil { + for _, n := range r.Networks { + idx.addReserved(n) + } + } +} + +// AddAllocs is used to add the used network resources +func (idx *NetworkIndex) AddAllocs(allocs []*structs.Allocation) { + for _, alloc := range allocs { + for _, task := range alloc.TaskResources { + if len(task.Networks) == 0 { + continue + } + n := task.Networks[0] + idx.addReserved(n) + } + } +} + +// addReserved is used to add a reserved network usage +func (idx *NetworkIndex) addReserved(n *structs.NetworkResource) { + // Add the port usage + used := idx.UsedPorts[n.IP] + if used == nil { + used = make(map[int]struct{}) + idx.UsedPorts[n.IP] = used + } + for _, port := range n.ReservedPorts { + used[port] = struct{}{} + } + + // Add the bandwidth + idx.UsedBandwidth[n.Device] += n.MBits +} + +// yieldIP is used to iteratively invoke the callback with +// an available IP +func (idx *NetworkIndex) yieldIP(cb func(net *structs.NetworkResource, ip net.IP) bool) { + inc := func(ip net.IP) { + for j := len(ip) - 1; j >= 0; j-- { + ip[j]++ + if ip[j] > 0 { + break + } + } + } + + for _, n := range idx.AvailNetworks { + ip, ipnet, err := net.ParseCIDR(n.CIDR) + if err != nil { + continue + } + for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) { + if cb(n, ip) { + return + } + } + } +} + +// AssignNetwork is used to assign network resources given an ask +func (idx *NetworkIndex) AssignNetwork(ask *structs.NetworkResource) (out *structs.NetworkResource) { + idx.yieldIP(func(n *structs.NetworkResource, ip net.IP) (stop bool) { + // Convert the IP to a string + ipStr := ip.String() + + // Check if we would exceed the bandwidth cap + availBandwidth := idx.AvailBandwidth[n.Device] + usedBandwidth := idx.UsedBandwidth[n.Device] + if usedBandwidth+ask.MBits > availBandwidth { + return + } + + // Check if any of the reserved ports are in use + for _, port := range ask.ReservedPorts { + if _, ok := idx.UsedPorts[ipStr][port]; ok { + return false + } + } + + // Create the offer + offer := &structs.NetworkResource{ + Device: n.Device, + IP: ipStr, + ReservedPorts: ask.ReservedPorts, + } + + // Check if we need to generate any ports + for i := 0; i < ask.DynamicPorts; i++ { + attempts := 0 + PICK: + attempts++ + if attempts > maxRandPortAttempts { + return + } + + randPort := MinDynamicPort + rand.Intn(MaxDynamicPort-MinDynamicPort) + if _, ok := idx.UsedPorts[ipStr][randPort]; ok { + goto PICK + } + if intContains(offer.ReservedPorts, randPort) { + goto PICK + } + offer.ReservedPorts = append(offer.ReservedPorts, randPort) + } + + // Stop, we have an offer! + out = offer + return true + }) + return +} + +// intContains scans an integer slice for a value +func intContains(haystack []int, needle int) bool { + for _, item := range haystack { + if item == needle { + return true + } + } + return false +} diff --git a/scheduler/network_test.go b/scheduler/network_test.go new file mode 100644 index 000000000..eae22c354 --- /dev/null +++ b/scheduler/network_test.go @@ -0,0 +1,203 @@ +package scheduler + +import ( + "net" + "reflect" + "testing" + + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" +) + +func TestNetworkIndex_SetNode(t *testing.T) { + idx := NewNetworkIndex() + n := mock.Node() + idx.SetNode(n) + + if len(idx.AvailNetworks) != 1 { + t.Fatalf("Bad") + } + if idx.AvailBandwidth["eth0"] != 1000 { + t.Fatalf("Bad") + } + if idx.UsedBandwidth["eth0"] != 1 { + t.Fatalf("Bad") + } + if _, ok := idx.UsedPorts["192.168.0.100"][22]; !ok { + t.Fatalf("Bad") + } +} + +func TestNetworkIndex_AddAllocs(t *testing.T) { + idx := NewNetworkIndex() + allocs := []*structs.Allocation{ + &structs.Allocation{ + TaskResources: map[string]*structs.Resources{ + "web": &structs.Resources{ + Networks: []*structs.NetworkResource{ + &structs.NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + MBits: 20, + ReservedPorts: []int{8000, 9000}, + }, + }, + }, + }, + }, + &structs.Allocation{ + TaskResources: map[string]*structs.Resources{ + "api": &structs.Resources{ + Networks: []*structs.NetworkResource{ + &structs.NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + MBits: 50, + ReservedPorts: []int{10000}, + }, + }, + }, + }, + }, + } + idx.AddAllocs(allocs) + + if idx.UsedBandwidth["eth0"] != 70 { + t.Fatalf("Bad") + } + if _, ok := idx.UsedPorts["192.168.0.100"][8000]; !ok { + t.Fatalf("Bad") + } + if _, ok := idx.UsedPorts["192.168.0.100"][9000]; !ok { + t.Fatalf("Bad") + } + if _, ok := idx.UsedPorts["192.168.0.100"][10000]; !ok { + t.Fatalf("Bad") + } +} + +func TestNetworkIndex_yieldIP(t *testing.T) { + idx := NewNetworkIndex() + n := mock.Node() + n.Resources.Networks[0].CIDR = "192.168.0.100/30" + idx.SetNode(n) + + var out []string + idx.yieldIP(func(n *structs.NetworkResource, ip net.IP) (stop bool) { + out = append(out, ip.String()) + return + }) + + expect := []string{"192.168.0.100", "192.168.0.101", + "192.168.0.102", "192.168.0.103"} + if !reflect.DeepEqual(out, expect) { + t.Fatalf("bad: %v", out) + } +} + +func TestNetworkIndex_AssignNetwork(t *testing.T) { + idx := NewNetworkIndex() + n := mock.Node() + n.Resources.Networks[0].CIDR = "192.168.0.100/30" + idx.SetNode(n) + + allocs := []*structs.Allocation{ + &structs.Allocation{ + TaskResources: map[string]*structs.Resources{ + "web": &structs.Resources{ + Networks: []*structs.NetworkResource{ + &structs.NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + MBits: 20, + ReservedPorts: []int{8000, 9000}, + }, + }, + }, + }, + }, + &structs.Allocation{ + TaskResources: map[string]*structs.Resources{ + "api": &structs.Resources{ + Networks: []*structs.NetworkResource{ + &structs.NetworkResource{ + Device: "eth0", + IP: "192.168.0.100", + MBits: 50, + ReservedPorts: []int{10000}, + }, + }, + }, + }, + }, + } + idx.AddAllocs(allocs) + + // Ask for a reserved port + ask := &structs.NetworkResource{ + ReservedPorts: []int{8000}, + } + offer := idx.AssignNetwork(ask) + if offer == nil { + t.Fatalf("bad") + } + if offer.IP != "192.168.0.101" { + t.Fatalf("bad: %#v", offer) + } + if len(offer.ReservedPorts) != 1 || offer.ReservedPorts[0] != 8000 { + t.Fatalf("bad: %#v", offer) + } + + // Ask for dynamic ports + ask = &structs.NetworkResource{ + DynamicPorts: 3, + } + offer = idx.AssignNetwork(ask) + if offer == nil { + t.Fatalf("bad") + } + if offer.IP != "192.168.0.100" { + t.Fatalf("bad: %#v", offer) + } + if len(offer.ReservedPorts) != 3 { + t.Fatalf("bad: %#v", offer) + } + + // Ask for reserved + dynamic ports + ask = &structs.NetworkResource{ + ReservedPorts: []int{12345}, + DynamicPorts: 3, + } + offer = idx.AssignNetwork(ask) + if offer == nil { + t.Fatalf("bad") + } + if offer.IP != "192.168.0.100" { + t.Fatalf("bad: %#v", offer) + } + if len(offer.ReservedPorts) != 4 || offer.ReservedPorts[0] != 12345 { + t.Fatalf("bad: %#v", offer) + } + + // Ask for too much bandwidth + ask = &structs.NetworkResource{ + MBits: 1000, + } + offer = idx.AssignNetwork(ask) + if offer != nil { + t.Fatalf("bad") + } +} + +func TestIntContains(t *testing.T) { + l := []int{1, 2, 10, 20} + if intContains(l, 50) { + t.Fatalf("bad") + } + if !intContains(l, 20) { + t.Fatalf("bad") + } + if !intContains(l, 1) { + t.Fatalf("bad") + } +}