From b5618163430a89b6face0cf333b91a00a66ae6e6 Mon Sep 17 00:00:00 2001 From: Preetha Appan Date: Wed, 3 Jul 2019 13:29:47 -0500 Subject: [PATCH] Scheduler changes to support network at task group level Also includes unit tests for binpacker and preemption. The tests verify that network resources specified at the task group level are properly accounted for --- nomad/structs/network.go | 9 ++ nomad/structs/structs.go | 10 +- scheduler/preemption_test.go | 25 +++- scheduler/rank_test.go | 242 ++++++++++++++++++++++++++++++++++- scheduler/system_sched.go | 7 +- 5 files changed, 287 insertions(+), 6 deletions(-) diff --git a/nomad/structs/network.go b/nomad/structs/network.go index aaa81b64e..d76eeaf96 100644 --- a/nomad/structs/network.go +++ b/nomad/structs/network.go @@ -113,6 +113,15 @@ func (idx *NetworkIndex) AddAllocs(allocs []*Allocation) (collide bool) { } if alloc.AllocatedResources != nil { + // Add network resources that are at the task group level + if len(alloc.AllocatedResources.Shared.Networks) > 0 { + for _, network := range alloc.AllocatedResources.Shared.Networks { + if idx.AddReserved(network) { + collide = true + } + } + } + for _, task := range alloc.AllocatedResources.Tasks { if len(task.Networks) == 0 { continue diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e91312c99..4d6408869 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2857,6 +2857,14 @@ func (a *AllocatedResources) Comparable() *ComparableResources { for _, r := range a.Tasks { c.Flattened.Add(r) } + // Add network resources that are at the task group level + if len(a.Shared.Networks) > 0 { + for _, network := range a.Shared.Networks { + c.Flattened.Add(&AllocatedTaskResources{ + Networks: []*NetworkResource{network}, + }) + } + } return c } @@ -8079,7 +8087,7 @@ func (a *Allocation) SetEventDisplayMessages() { } // COMPAT(0.11): Remove in 0.11 -// ComparableResources returns the resouces on the allocation +// ComparableResources returns the resources on the allocation // handling upgrade paths. After 0.11 calls to this should be replaced with: // alloc.AllocatedResources.Comparable() func (a *Allocation) ComparableResources() *ComparableResources { diff --git a/scheduler/preemption_test.go b/scheduler/preemption_test.go index 3d7f702f3..798c3985c 100644 --- a/scheduler/preemption_test.go +++ b/scheduler/preemption_test.go @@ -512,7 +512,7 @@ func TestPreemption(t *testing.T) { }, }, }), - createAlloc(allocIDs[1], lowPrioJob, &structs.Resources{ + createAllocWithTaskgroupNetwork(allocIDs[1], lowPrioJob, &structs.Resources{ CPU: 200, MemoryMB: 256, DiskMB: 4 * 1024, @@ -520,9 +520,13 @@ func TestPreemption(t *testing.T) { { Device: "eth0", IP: "192.168.0.200", - MBits: 500, + MBits: 200, }, }, + }, &structs.NetworkResource{ + Device: "eth0", + IP: "192.168.0.201", + MBits: 300, }), createAlloc(allocIDs[2], lowPrioJob, &structs.Resources{ CPU: 200, @@ -1379,10 +1383,19 @@ func TestPreemption(t *testing.T) { // helper method to create allocations with given jobs and resources func createAlloc(id string, job *structs.Job, resource *structs.Resources) *structs.Allocation { - return createAllocWithDevice(id, job, resource, nil) + return createAllocInner(id, job, resource, nil, nil) +} + +// helper method to create allocation with network at the task group level +func createAllocWithTaskgroupNetwork(id string, job *structs.Job, resource *structs.Resources, tgNet *structs.NetworkResource) *structs.Allocation { + return createAllocInner(id, job, resource, nil, tgNet) } func createAllocWithDevice(id string, job *structs.Job, resource *structs.Resources, allocatedDevices *structs.AllocatedDeviceResource) *structs.Allocation { + return createAllocInner(id, job, resource, allocatedDevices, nil) +} + +func createAllocInner(id string, job *structs.Job, resource *structs.Resources, allocatedDevices *structs.AllocatedDeviceResource, tgNetwork *structs.NetworkResource) *structs.Allocation { alloc := &structs.Allocation{ ID: id, Job: job, @@ -1413,5 +1426,11 @@ func createAllocWithDevice(id string, job *structs.Job, resource *structs.Resour if allocatedDevices != nil { alloc.AllocatedResources.Tasks["web"].Devices = []*structs.AllocatedDeviceResource{allocatedDevices} } + + if tgNetwork != nil { + alloc.AllocatedResources.Shared = structs.AllocatedSharedResources{ + Networks: []*structs.NetworkResource{tgNetwork}, + } + } return alloc } diff --git a/scheduler/rank_test.go b/scheduler/rank_test.go index 65fe65b9d..b1387b475 100644 --- a/scheduler/rank_test.go +++ b/scheduler/rank_test.go @@ -6,7 +6,7 @@ import ( "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" - require "github.com/stretchr/testify/require" + "github.com/stretchr/testify/require" ) func TestFeasibleRankIterator(t *testing.T) { @@ -127,6 +127,246 @@ func TestBinPackIterator_NoExistingAlloc(t *testing.T) { } } +// Tests bin packing iterator with network resources at task and task group level +func TestBinPackIterator_Network_Success(t *testing.T) { + _, ctx := testContext(t) + nodes := []*RankedNode{ + { + Node: &structs.Node{ + // Perfect fit + NodeResources: &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: 2048, + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: 2048, + }, + Networks: []*structs.NetworkResource{ + { + Mode: "host", + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + }, + ReservedResources: &structs.NodeReservedResources{ + Cpu: structs.NodeReservedCpuResources{ + CpuShares: 1024, + }, + Memory: structs.NodeReservedMemoryResources{ + MemoryMB: 1024, + }, + Networks: structs.NodeReservedNetworkResources{ + ReservedHostPorts: "1000-2000", + }, + }, + }, + }, + { + Node: &structs.Node{ + // 50% fit + NodeResources: &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: 4096, + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: 4096, + }, + Networks: []*structs.NetworkResource{ + { + Mode: "host", + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + }, + ReservedResources: &structs.NodeReservedResources{ + Cpu: structs.NodeReservedCpuResources{ + CpuShares: 1024, + }, + Memory: structs.NodeReservedMemoryResources{ + MemoryMB: 1024, + }, + Networks: structs.NodeReservedNetworkResources{ + ReservedHostPorts: "1000-2000", + }, + }, + }, + }, + } + static := NewStaticRankIterator(ctx, nodes) + + // Create a task group with networks specified at task and task group level + taskGroup := &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 300, + }, + }, + }, + }, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 500, + }, + }, + } + binp := NewBinPackIterator(ctx, static, false, 0) + binp.SetTaskGroup(taskGroup) + + scoreNorm := NewScoreNormalizationIterator(ctx, binp) + + out := collectRanked(scoreNorm) + require := require.New(t) + + // We expect both nodes to be eligible to place + require.Len(out, 2) + require.Equal(out[0], nodes[0]) + require.Equal(out[1], nodes[1]) + + // First node should have a perfect score + require.Equal(1.0, out[0].FinalScore) + + if out[1].FinalScore < 0.75 || out[1].FinalScore > 0.95 { + t.Fatalf("Bad Score: %v", out[1].FinalScore) + } + + // Verify network information at taskgroup level + require.Equal(500, out[0].AllocResources.Networks[0].MBits) + require.Equal(500, out[1].AllocResources.Networks[0].MBits) + + // Verify network information at task level + require.Equal(300, out[0].TaskResources["web"].Networks[0].MBits) + require.Equal(300, out[1].TaskResources["web"].Networks[0].MBits) +} + +// Tests that bin packing iterator fails due to overprovisioning of network +// This test has network resources at task group and task level +func TestBinPackIterator_Network_Failure(t *testing.T) { + _, ctx := testContext(t) + nodes := []*RankedNode{ + { + Node: &structs.Node{ + // 50% fit + NodeResources: &structs.NodeResources{ + Cpu: structs.NodeCpuResources{ + CpuShares: 4096, + }, + Memory: structs.NodeMemoryResources{ + MemoryMB: 4096, + }, + Networks: []*structs.NetworkResource{ + { + Mode: "host", + Device: "eth0", + CIDR: "192.168.0.100/32", + MBits: 1000, + }, + }, + }, + ReservedResources: &structs.NodeReservedResources{ + Cpu: structs.NodeReservedCpuResources{ + CpuShares: 1024, + }, + Memory: structs.NodeReservedMemoryResources{ + MemoryMB: 1024, + }, + Networks: structs.NodeReservedNetworkResources{ + ReservedHostPorts: "1000-2000", + }, + }, + }, + }, + } + + // Add a planned alloc that takes up some network mbits at task and task group level + plan := ctx.Plan() + plan.NodeAllocation[nodes[0].Node.ID] = []*structs.Allocation{ + { + AllocatedResources: &structs.AllocatedResources{ + Tasks: map[string]*structs.AllocatedTaskResources{ + "web": { + Cpu: structs.AllocatedCpuResources{ + CpuShares: 2048, + }, + Memory: structs.AllocatedMemoryResources{ + MemoryMB: 2048, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.1", + MBits: 300, + }, + }, + }, + }, + Shared: structs.AllocatedSharedResources{ + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + IP: "192.168.0.1", + MBits: 400, + }, + }, + }, + }, + }, + } + static := NewStaticRankIterator(ctx, nodes) + + // Create a task group with networks specified at task and task group level + taskGroup := &structs.TaskGroup{ + EphemeralDisk: &structs.EphemeralDisk{}, + Tasks: []*structs.Task{ + { + Name: "web", + Resources: &structs.Resources{ + CPU: 1024, + MemoryMB: 1024, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 300, + }, + }, + }, + }, + }, + Networks: []*structs.NetworkResource{ + { + Device: "eth0", + MBits: 500, + }, + }, + } + + binp := NewBinPackIterator(ctx, static, false, 0) + binp.SetTaskGroup(taskGroup) + + scoreNorm := NewScoreNormalizationIterator(ctx, binp) + + out := collectRanked(scoreNorm) + require := require.New(t) + + // We expect a placement failure because we need 800 mbits of network + // and only 200 is free + require.Len(out, 0) + require.Equal(1, ctx.metrics.DimensionExhausted["network: bandwidth exceeded"]) +} + func TestBinPackIterator_PlannedAlloc(t *testing.T) { _, ctx := testContext(t) nodes := []*RankedNode{ diff --git a/scheduler/system_sched.go b/scheduler/system_sched.go index b0fab7756..993533116 100644 --- a/scheduler/system_sched.go +++ b/scheduler/system_sched.go @@ -344,6 +344,10 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { }, } + if option.AllocResources != nil { + resources.Shared.Networks = option.AllocResources.Networks + } + // Create an allocation for this alloc := &structs.Allocation{ ID: uuid.Generate(), @@ -360,7 +364,8 @@ func (s *SystemScheduler) computePlacements(place []allocTuple) error { DesiredStatus: structs.AllocDesiredStatusRun, ClientStatus: structs.AllocClientStatusPending, SharedResources: &structs.Resources{ - DiskMB: missing.TaskGroup.EphemeralDisk.SizeMB, + DiskMB: missing.TaskGroup.EphemeralDisk.SizeMB, + Networks: missing.TaskGroup.Networks, }, }