Unmark drain when nodes hit their deadline and only batch/system left and add all job type integration test

This commit is contained in:
Alex Dadgar
2018-03-28 11:57:47 -07:00
committed by Alex Dadgar
parent 35f42b1fca
commit 3099ef05e2
5 changed files with 192 additions and 28 deletions

View File

@@ -248,10 +248,19 @@ func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) {
continue
}
n.logger.Printf("[DEBUG] nomad.drain: node %q deadlined causing %d allocs to be force stopped", node, len(allocs))
forceStop = append(forceStop, allocs...)
}
n.l.RUnlock()
n.batchDrainAllocs(forceStop)
// Submit the node transistions in a sharded form to ensure a reasonable
// Raft transaction size.
for _, nodes := range partitionIds(nodes) {
if _, err := n.raft.NodesDrainComplete(nodes); err != nil {
n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err)
}
}
}
// handleJobAllocDrain handles marking a set of allocations as having a desired

View File

@@ -38,10 +38,13 @@ func allocPromoter(t *testing.T, ctx context.Context,
// For each alloc that doesn't have its deployment status set, set it
var updates []*structs.Allocation
for _, alloc := range allocs {
if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Healthy != nil {
if alloc.Job.Type != structs.JobTypeService {
continue
}
if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Healthy != nil {
continue
}
newAlloc := alloc.Copy()
newAlloc.DeploymentStatus = &structs.AllocDeploymentStatus{
Healthy: helper.BoolToPtr(true),
@@ -59,11 +62,11 @@ func allocPromoter(t *testing.T, ctx context.Context,
Alloc: updates,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.NodeAllocsResponse
var resp structs.GenericResponse
if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", req, &resp); err != nil {
if ctx.Err() == context.Canceled {
return
} else {
} else if err != nil {
require.Nil(t, err)
}
}
@@ -323,3 +326,155 @@ func TestDrainer_DrainEmptyNode(t *testing.T) {
t.Fatalf("err: %v", err)
})
}
func TestDrainer_AllTypes_Deadline(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
// Create two nodes, registering the second later
n1, n2 := mock.Node(), mock.Node()
nodeReg := &structs.NodeRegisterRequest{
Node: n1,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var nodeResp structs.NodeUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Create a service job that runs on just one
job := mock.Job()
job.TaskGroups[0].Count = 2
req := &structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
// Fetch the response
var resp structs.JobRegisterResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
// Create a system job
sysjob := mock.SystemJob()
req = &structs.JobRegisterRequest{
Job: sysjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
// Fetch the response
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
// Create a batch job
bjob := mock.BatchJob()
bjob.TaskGroups[0].Count = 2
req = &structs.JobRegisterRequest{
Job: bjob,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
// Fetch the response
require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp))
require.NotZero(resp.Index)
// Wait for the allocations to be placed
state := s1.State()
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n1.ID)
if err != nil {
return false, err
}
return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs))
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Create the second node
nodeReg = &structs.NodeRegisterRequest{
Node: n2,
WriteRequest: structs.WriteRequest{Region: "global"},
}
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp))
// Drain the node
drainReq := &structs.NodeUpdateDrainRequest{
NodeID: n1.ID,
DrainStrategy: &structs.DrainStrategy{
DrainSpec: structs.DrainSpec{
Deadline: 2 * time.Second,
},
},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var drainResp structs.NodeDrainUpdateResponse
require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp))
// Wait for the allocs to be replaced
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger)
go allocPromoter(t, ctx, state, codec, n2.ID, s1.logger)
// Wait for the allocs to be stopped
var finalAllocs []*structs.Allocation
testutil.WaitForResult(func() (bool, error) {
var err error
finalAllocs, err = state.AllocsByNode(nil, n1.ID)
if err != nil {
return false, err
}
for _, alloc := range finalAllocs {
if alloc.DesiredStatus != structs.AllocDesiredStatusStop {
return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus)
}
}
return true, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Check that the node drain is removed
testutil.WaitForResult(func() (bool, error) {
node, err := state.NodeByID(nil, n1.ID)
if err != nil {
return false, err
}
return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set")
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Wait for the allocations to be placed on the other node
testutil.WaitForResult(func() (bool, error) {
allocs, err := state.AllocsByNode(nil, n2.ID)
if err != nil {
return false, err
}
return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs))
}, func(err error) {
t.Fatalf("err: %v", err)
})
// Assert that the service finished before the batch and system
var serviceMax, batchMax uint64 = 0, 0
for _, alloc := range finalAllocs {
if alloc.Job.Type == structs.JobTypeService && alloc.ModifyIndex > serviceMax {
serviceMax = alloc.ModifyIndex
} else if alloc.Job.Type == structs.JobTypeBatch && alloc.ModifyIndex > batchMax {
batchMax = alloc.ModifyIndex
}
}
require.True(serviceMax < batchMax)
}

View File

@@ -15,10 +15,11 @@ func Node() *structs.Node {
Datacenter: "dc1",
Name: "foobar",
Attributes: map[string]string{
"kernel.name": "linux",
"arch": "x86",
"nomad.version": "0.5.0",
"driver.exec": "1",
"kernel.name": "linux",
"arch": "x86",
"nomad.version": "0.5.0",
"driver.exec": "1",
"driver.mock_driver": "1",
},
Resources: &structs.Resources{
CPU: 4000,
@@ -65,7 +66,7 @@ func Node() *structs.Node {
func Job() *structs.Job {
job := &structs.Job{
Region: "global",
ID: uuid.Generate(),
ID: fmt.Sprintf("mock-service-%s", uuid.Generate()),
Name: "my-job",
Namespace: structs.DefaultNamespace,
Type: structs.JobTypeService,
@@ -172,7 +173,7 @@ func Job() *structs.Job {
func BatchJob() *structs.Job {
job := &structs.Job{
Region: "global",
ID: uuid.Generate(),
ID: fmt.Sprintf("mock-batch-%s", uuid.Generate()),
Name: "batch-job",
Namespace: structs.DefaultNamespace,
Type: structs.JobTypeBatch,
@@ -239,7 +240,7 @@ func SystemJob() *structs.Job {
job := &structs.Job{
Region: "global",
Namespace: structs.DefaultNamespace,
ID: uuid.Generate(),
ID: fmt.Sprintf("mock-system-%s", uuid.Generate()),
Name: "my-job",
Type: structs.JobTypeSystem,
Priority: 100,

View File

@@ -92,6 +92,15 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node,
continue
}
// If we have been marked for migration and aren't terminal, migrate
if !exist.TerminalStatus() && exist.DesiredTransition.ShouldMigrate() {
result.migrate = append(result.migrate, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
continue
}
// If we are on a tainted node, we must migrate if we are a service or
// if the batch allocation did not finish
if node, ok := taintedNodes[exist.NodeID]; ok {
@@ -104,22 +113,12 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node,
goto IGNORE
}
if !exist.TerminalStatus() {
if node == nil || node.TerminalStatus() {
result.lost = append(result.lost, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
} else if exist.DesiredTransition.ShouldMigrate() {
result.migrate = append(result.migrate, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
} else {
goto IGNORE
}
if !exist.TerminalStatus() && (node == nil || node.TerminalStatus()) {
result.lost = append(result.lost, allocTuple{
Name: name,
TaskGroup: tg,
Alloc: exist,
})
} else {
goto IGNORE
}

View File

@@ -18,7 +18,7 @@ type testFn func() (bool, error)
type errorFn func(error)
func WaitForResult(test testFn, error errorFn) {
WaitForResultRetries(2000*TestMultiplier(), test, error)
WaitForResultRetries(500*TestMultiplier(), test, error)
}
func WaitForResultRetries(retries int64, test testFn, error errorFn) {
@@ -56,7 +56,7 @@ func AssertUntil(until time.Duration, test testFn, error errorFn) {
// the tests are being run under.
func TestMultiplier() int64 {
if IsTravis() {
return 3
return 4
}
return 1