diff --git a/nomad/drainer/drainer.go b/nomad/drainer/drainer.go index f167cf1f5..db345a2ee 100644 --- a/nomad/drainer/drainer.go +++ b/nomad/drainer/drainer.go @@ -248,10 +248,19 @@ func (n *NodeDrainer) handleDeadlinedNodes(nodes []string) { continue } + n.logger.Printf("[DEBUG] nomad.drain: node %q deadlined causing %d allocs to be force stopped", node, len(allocs)) forceStop = append(forceStop, allocs...) } n.l.RUnlock() n.batchDrainAllocs(forceStop) + + // Submit the node transistions in a sharded form to ensure a reasonable + // Raft transaction size. + for _, nodes := range partitionIds(nodes) { + if _, err := n.raft.NodesDrainComplete(nodes); err != nil { + n.logger.Printf("[ERR] nomad.drain: failed to unset drain for nodes: %v", err) + } + } } // handleJobAllocDrain handles marking a set of allocations as having a desired diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index f71363a0d..aa6e255b5 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -38,10 +38,13 @@ func allocPromoter(t *testing.T, ctx context.Context, // For each alloc that doesn't have its deployment status set, set it var updates []*structs.Allocation for _, alloc := range allocs { - if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Healthy != nil { + if alloc.Job.Type != structs.JobTypeService { continue } + if alloc.DeploymentStatus != nil && alloc.DeploymentStatus.Healthy != nil { + continue + } newAlloc := alloc.Copy() newAlloc.DeploymentStatus = &structs.AllocDeploymentStatus{ Healthy: helper.BoolToPtr(true), @@ -59,11 +62,11 @@ func allocPromoter(t *testing.T, ctx context.Context, Alloc: updates, WriteRequest: structs.WriteRequest{Region: "global"}, } - var resp structs.NodeAllocsResponse + var resp structs.GenericResponse if err := msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", req, &resp); err != nil { if ctx.Err() == context.Canceled { return - } else { + } else if err != nil { require.Nil(t, err) } } @@ -323,3 +326,155 @@ func TestDrainer_DrainEmptyNode(t *testing.T) { t.Fatalf("err: %v", err) }) } + +func TestDrainer_AllTypes_Deadline(t *testing.T) { + t.Parallel() + require := require.New(t) + s1 := TestServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create two nodes, registering the second later + n1, n2 := mock.Node(), mock.Node() + nodeReg := &structs.NodeRegisterRequest{ + Node: n1, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var nodeResp structs.NodeUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + + // Create a service job that runs on just one + job := mock.Job() + job.TaskGroups[0].Count = 2 + req := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp structs.JobRegisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + + // Create a system job + sysjob := mock.SystemJob() + req = &structs.JobRegisterRequest{ + Job: sysjob, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + + // Create a batch job + bjob := mock.BatchJob() + bjob.TaskGroups[0].Count = 2 + req = &structs.JobRegisterRequest{ + Job: bjob, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + + // Wait for the allocations to be placed + state := s1.State() + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n1.ID) + if err != nil { + return false, err + } + return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Create the second node + nodeReg = &structs.NodeRegisterRequest{ + Node: n2, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + + // Drain the node + drainReq := &structs.NodeUpdateDrainRequest{ + NodeID: n1.ID, + DrainStrategy: &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: 2 * time.Second, + }, + }, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var drainResp structs.NodeDrainUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + + // Wait for the allocs to be replaced + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger) + go allocPromoter(t, ctx, state, codec, n2.ID, s1.logger) + + // Wait for the allocs to be stopped + var finalAllocs []*structs.Allocation + testutil.WaitForResult(func() (bool, error) { + var err error + finalAllocs, err = state.AllocsByNode(nil, n1.ID) + if err != nil { + return false, err + } + for _, alloc := range finalAllocs { + if alloc.DesiredStatus != structs.AllocDesiredStatusStop { + return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus) + } + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Check that the node drain is removed + testutil.WaitForResult(func() (bool, error) { + node, err := state.NodeByID(nil, n1.ID) + if err != nil { + return false, err + } + return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Wait for the allocations to be placed on the other node + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n2.ID) + if err != nil { + return false, err + } + return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Assert that the service finished before the batch and system + var serviceMax, batchMax uint64 = 0, 0 + for _, alloc := range finalAllocs { + if alloc.Job.Type == structs.JobTypeService && alloc.ModifyIndex > serviceMax { + serviceMax = alloc.ModifyIndex + } else if alloc.Job.Type == structs.JobTypeBatch && alloc.ModifyIndex > batchMax { + batchMax = alloc.ModifyIndex + } + } + require.True(serviceMax < batchMax) +} diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index bf1df8ea1..c037289dd 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -15,10 +15,11 @@ func Node() *structs.Node { Datacenter: "dc1", Name: "foobar", Attributes: map[string]string{ - "kernel.name": "linux", - "arch": "x86", - "nomad.version": "0.5.0", - "driver.exec": "1", + "kernel.name": "linux", + "arch": "x86", + "nomad.version": "0.5.0", + "driver.exec": "1", + "driver.mock_driver": "1", }, Resources: &structs.Resources{ CPU: 4000, @@ -65,7 +66,7 @@ func Node() *structs.Node { func Job() *structs.Job { job := &structs.Job{ Region: "global", - ID: uuid.Generate(), + ID: fmt.Sprintf("mock-service-%s", uuid.Generate()), Name: "my-job", Namespace: structs.DefaultNamespace, Type: structs.JobTypeService, @@ -172,7 +173,7 @@ func Job() *structs.Job { func BatchJob() *structs.Job { job := &structs.Job{ Region: "global", - ID: uuid.Generate(), + ID: fmt.Sprintf("mock-batch-%s", uuid.Generate()), Name: "batch-job", Namespace: structs.DefaultNamespace, Type: structs.JobTypeBatch, @@ -239,7 +240,7 @@ func SystemJob() *structs.Job { job := &structs.Job{ Region: "global", Namespace: structs.DefaultNamespace, - ID: uuid.Generate(), + ID: fmt.Sprintf("mock-system-%s", uuid.Generate()), Name: "my-job", Type: structs.JobTypeSystem, Priority: 100, diff --git a/scheduler/util.go b/scheduler/util.go index 6b589fa50..7fc9197fe 100644 --- a/scheduler/util.go +++ b/scheduler/util.go @@ -92,6 +92,15 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node, continue } + // If we have been marked for migration and aren't terminal, migrate + if !exist.TerminalStatus() && exist.DesiredTransition.ShouldMigrate() { + result.migrate = append(result.migrate, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) + continue + } // If we are on a tainted node, we must migrate if we are a service or // if the batch allocation did not finish if node, ok := taintedNodes[exist.NodeID]; ok { @@ -104,22 +113,12 @@ func diffAllocs(job *structs.Job, taintedNodes map[string]*structs.Node, goto IGNORE } - if !exist.TerminalStatus() { - if node == nil || node.TerminalStatus() { - result.lost = append(result.lost, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: exist, - }) - } else if exist.DesiredTransition.ShouldMigrate() { - result.migrate = append(result.migrate, allocTuple{ - Name: name, - TaskGroup: tg, - Alloc: exist, - }) - } else { - goto IGNORE - } + if !exist.TerminalStatus() && (node == nil || node.TerminalStatus()) { + result.lost = append(result.lost, allocTuple{ + Name: name, + TaskGroup: tg, + Alloc: exist, + }) } else { goto IGNORE } diff --git a/testutil/wait.go b/testutil/wait.go index 11bf267a8..f3f993b87 100644 --- a/testutil/wait.go +++ b/testutil/wait.go @@ -18,7 +18,7 @@ type testFn func() (bool, error) type errorFn func(error) func WaitForResult(test testFn, error errorFn) { - WaitForResultRetries(2000*TestMultiplier(), test, error) + WaitForResultRetries(500*TestMultiplier(), test, error) } func WaitForResultRetries(retries int64, test testFn, error errorFn) { @@ -56,7 +56,7 @@ func AssertUntil(until time.Duration, test testFn, error errorFn) { // the tests are being run under. func TestMultiplier() int64 { if IsTravis() { - return 3 + return 4 } return 1