diff --git a/nomad/drainer/drain_heap.go b/nomad/drainer/drain_heap.go index 5f93760c8..3d51513a9 100644 --- a/nomad/drainer/drain_heap.go +++ b/nomad/drainer/drain_heap.go @@ -91,6 +91,9 @@ func (d *deadlineHeap) watch() { continue } + // If the deadline is zero, it is a force drain. Otherwise if the + // deadline is in the future, see if we already have a timer setup to + // handle it. If we don't create the timer. if deadline.IsZero() || !deadline.Equal(nextDeadline) { timer.Reset(deadline.Sub(time.Now())) nextDeadline = deadline diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index cb1d147ae..b6cb88950 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -635,121 +635,136 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { } // Test that transistions to force drain work. -func TestDrainer_Batch_Force(t *testing.T) { +func TestDrainer_Batch_TransistionToForce(t *testing.T) { t.Parallel() require := require.New(t) - s1 := TestServer(t, nil) - defer s1.Shutdown() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) - // Create a node - n1 := mock.Node() - nodeReg := &structs.NodeRegisterRequest{ - Node: n1, - WriteRequest: structs.WriteRequest{Region: "global"}, - } - var nodeResp structs.NodeUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - - // Create a batch job - bjob := mock.BatchJob() - bjob.TaskGroups[0].Count = 2 - req := &structs.JobRegisterRequest{ - Job: bjob, - WriteRequest: structs.WriteRequest{ - Region: "global", - Namespace: bjob.Namespace, - }, - } - - // Fetch the response - var resp structs.JobRegisterResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) - - // Wait for the allocations to be placed - state := s1.State() - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err + for _, inf := range []bool{true, false} { + name := "Infinite" + if !inf { + name = "Deadline" } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + t.Run(name, func(t *testing.T) { + s1 := TestServer(t, nil) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) - // Drain the node - drainReq := &structs.NodeUpdateDrainRequest{ - NodeID: n1.ID, - DrainStrategy: &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: 0 * time.Second, // Infinite - }, - }, - WriteRequest: structs.WriteRequest{Region: "global"}, - } - var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - - // Wait for the allocs to be replaced - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger) - - // Make sure the batch job isn't affected - testutil.AssertUntil(500*time.Millisecond, func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - for _, alloc := range allocs { - if alloc.DesiredStatus != structs.AllocDesiredStatusRun { - return false, fmt.Errorf("got status %v", alloc.DesiredStatus) + // Create a node + n1 := mock.Node() + nodeReg := &structs.NodeRegisterRequest{ + Node: n1, + WriteRequest: structs.WriteRequest{Region: "global"}, } - } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + var nodeResp structs.NodeUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - // Foce drain the node - drainReq = &structs.NodeUpdateDrainRequest{ - NodeID: n1.ID, - DrainStrategy: &structs.DrainStrategy{ - DrainSpec: structs.DrainSpec{ - Deadline: -1 * time.Second, // Infinite - }, - }, - WriteRequest: structs.WriteRequest{Region: "global"}, - } - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - - // Make sure the batch job is migrated - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - for _, alloc := range allocs { - if alloc.DesiredStatus != structs.AllocDesiredStatusStop { - return false, fmt.Errorf("got status %v", alloc.DesiredStatus) + // Create a batch job + bjob := mock.BatchJob() + bjob.TaskGroups[0].Count = 2 + req := &structs.JobRegisterRequest{ + Job: bjob, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: bjob.Namespace, + }, } - } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) - // Check that the node drain is removed - testutil.WaitForResult(func() (bool, error) { - node, err := state.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - t.Fatalf("err: %v", err) - }) + // Fetch the response + var resp structs.JobRegisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + require.NotZero(resp.Index) + + // Wait for the allocations to be placed + state := s1.State() + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n1.ID) + if err != nil { + return false, err + } + return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Pick the deadline + deadline := 0 * time.Second + if !inf { + deadline = 10 * time.Second + } + + // Drain the node + drainReq := &structs.NodeUpdateDrainRequest{ + NodeID: n1.ID, + DrainStrategy: &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: deadline, + }, + }, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + var drainResp structs.NodeDrainUpdateResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + + // Wait for the allocs to be replaced + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go allocPromoter(t, ctx, state, codec, n1.ID, s1.logger) + + // Make sure the batch job isn't affected + testutil.AssertUntil(500*time.Millisecond, func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n1.ID) + if err != nil { + return false, err + } + for _, alloc := range allocs { + if alloc.DesiredStatus != structs.AllocDesiredStatusRun { + return false, fmt.Errorf("got status %v", alloc.DesiredStatus) + } + } + return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Foce drain the node + drainReq = &structs.NodeUpdateDrainRequest{ + NodeID: n1.ID, + DrainStrategy: &structs.DrainStrategy{ + DrainSpec: structs.DrainSpec{ + Deadline: -1 * time.Second, // Infinite + }, + }, + WriteRequest: structs.WriteRequest{Region: "global"}, + } + require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + + // Make sure the batch job is migrated + testutil.WaitForResult(func() (bool, error) { + allocs, err := state.AllocsByNode(nil, n1.ID) + if err != nil { + return false, err + } + for _, alloc := range allocs { + if alloc.DesiredStatus != structs.AllocDesiredStatusStop { + return false, fmt.Errorf("got status %v", alloc.DesiredStatus) + } + } + return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Check that the node drain is removed + testutil.WaitForResult(func() (bool, error) { + node, err := state.NodeByID(nil, n1.ID) + if err != nil { + return false, err + } + return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") + }, func(err error) { + t.Fatalf("err: %v", err) + }) + }) + } }