diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index a9162c540..76d70e7de 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -6,15 +6,16 @@ package nomad import ( "context" "fmt" - "net/rpc" "testing" "time" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" - "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" + "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/drainer" @@ -22,16 +23,21 @@ import ( "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" - "github.com/stretchr/testify/require" ) -func allocPromoter(errCh chan<- error, ctx context.Context, - state *state.StateStore, codec rpc.ClientCodec, nodeID string, - logger log.Logger) { +// allocClientStateSimulator simulates the updates in state from the +// client. allocations that are new on the server get marked with healthy +// deployments, and allocations that are DesiredStatus=stop on the server get +// updates with terminal client status. +func allocClientStateSimulator(t *testing.T, errCh chan<- error, ctx context.Context, + srv *Server, nodeID string, logger log.Logger) { + + codec := rpcClient(t, srv) + store := srv.State() nindex := uint64(1) for { - allocs, index, err := getNodeAllocs(ctx, state, nodeID, nindex) + allocs, index, err := getNodeAllocs(ctx, store, nodeID, nindex) if err != nil { if err == context.Canceled { return @@ -51,7 +57,7 @@ func allocPromoter(errCh chan<- error, ctx context.Context, } if alloc.DeploymentStatus.HasHealth() { - continue + continue // only update to healthy once } newAlloc := alloc.Copy() newAlloc.DeploymentStatus = &structs.AllocDeploymentStatus{ @@ -59,7 +65,7 @@ func allocPromoter(errCh chan<- error, ctx context.Context, Timestamp: now, } updates = append(updates, newAlloc) - logger.Trace("marked deployment health for alloc", "alloc_id", alloc.ID) + logger.Trace("marking deployment health for alloc", "alloc_id", alloc.ID) } if len(updates) == 0 { @@ -83,7 +89,7 @@ func allocPromoter(errCh chan<- error, ctx context.Context, } // checkAllocPromoter is a small helper to return an error or nil from an error -// chan like the one given to the allocPromoter goroutine. +// chan like the one given to the allocClientStateSimulator goroutine. func checkAllocPromoter(errCh chan error) error { select { case err := <-errCh: @@ -93,8 +99,8 @@ func checkAllocPromoter(errCh chan error) error { } } -func getNodeAllocs(ctx context.Context, state *state.StateStore, nodeID string, index uint64) ([]*structs.Allocation, uint64, error) { - resp, index, err := state.BlockingQuery(getNodeAllocsImpl(nodeID), index, ctx) +func getNodeAllocs(ctx context.Context, store *state.StateStore, nodeID string, index uint64) ([]*structs.Allocation, uint64, error) { + resp, index, err := store.BlockingQuery(getNodeAllocsImpl(nodeID), index, ctx) if err != nil { return nil, 0, err } @@ -105,16 +111,16 @@ func getNodeAllocs(ctx context.Context, state *state.StateStore, nodeID string, return resp.([]*structs.Allocation), index, nil } -func getNodeAllocsImpl(nodeID string) func(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { - return func(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { +func getNodeAllocsImpl(nodeID string) func(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) { + return func(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) { // Capture all the allocations - allocs, err := state.AllocsByNode(ws, nodeID) + allocs, err := store.AllocsByNode(ws, nodeID) if err != nil { return nil, 0, err } // Use the last index that affected the jobs table - index, err := state.Index("allocs") + index, err := store.Index("allocs") if err != nil { return nil, index, err } @@ -125,23 +131,23 @@ func getNodeAllocsImpl(nodeID string) func(ws memdb.WatchSet, state *state.State func TestDrainer_Simple_ServiceOnly(t *testing.T) { ci.Parallel(t) - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() - // Create two nodes - n1, n2 := mock.Node(), mock.Node() + // Create a node + n1 := mock.Node() nodeReg := &structs.NodeRegisterRequest{ Node: n1, WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - // Create a job that runs on just one + // Create a job that runs on that node job := mock.Job() job.TaskGroups[0].Count = 2 req := &structs.JobRegisterRequest{ @@ -151,30 +157,20 @@ func TestDrainer_Simple_ServiceOnly(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response var resp structs.JobRegisterResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) // Wait for the two allocations to be placed - state := s1.State() - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByJob(nil, job.Namespace, job.ID, false) - if err != nil { - return false, err - } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + waitForPlacedAllocs(t, store, n1.ID, 2) // Create the second node + n2 := mock.Node() nodeReg = &structs.NodeRegisterRequest{ Node: n2, WriteRequest: structs.WriteRequest{Region: "global"}, } - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) // Drain the first node drainReq := &structs.NodeUpdateDrainRequest{ @@ -187,55 +183,31 @@ func TestDrainer_Simple_ServiceOnly(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Wait for the allocs to be replaced + // Setup client simulator errCh := make(chan error, 2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) - go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger) - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n2.ID) - if err != nil { - return false, err - } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + // Wait for the allocs to be replaced + waitForAllocsStop(t, store, n1.ID, nil) + waitForPlacedAllocs(t, store, n2.ID, 2) - // Check that the node drain is removed - testutil.WaitForResult(func() (bool, error) { - if err := checkAllocPromoter(errCh); err != nil { - return false, err - } - node, err := state.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Check we got the right events - node, err := state.NodeByID(nil, n1.ID) - require.NoError(err) - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, "") } func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) { ci.Parallel(t) - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() // Create a node n1 := mock.Node() @@ -244,9 +216,9 @@ func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - // Create a job that runs on just one + // Create a job that runs on it job := mock.Job() job.Update = *structs.DefaultUpdateStrategy job.Update.Stagger = 30 * time.Second @@ -258,23 +230,12 @@ func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response var resp structs.JobRegisterResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) // Wait for the two allocations to be placed - state := s1.State() - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByJob(nil, job.Namespace, job.ID, false) - if err != nil { - return false, err - } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + waitForPlacedAllocs(t, store, n1.ID, 2) // Drain the node drainReq := &structs.NodeUpdateDrainRequest{ @@ -287,61 +248,32 @@ func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Wait for the allocs to be stopped - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - for _, alloc := range allocs { - if alloc.DesiredStatus != structs.AllocDesiredStatusStop { - return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus) - } - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) + // Wait for the allocs to be stopped (but not replaced) + waitForAllocsStop(t, store, n1.ID, nil) - // Check that the node drain is removed - testutil.WaitForResult(func() (bool, error) { - node, err := state.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Check we got the right events - node, err := state.NodeByID(nil, n1.ID) - require.NoError(err) - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) - require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, nil, 3, drainer.NodeDrainEventDetailDeadlined) } func TestDrainer_DrainEmptyNode(t *testing.T) { ci.Parallel(t) - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() - // Create a node + // Create an empty node n1 := mock.Node() nodeReg := &structs.NodeRegisterRequest{ Node: n1, WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) // Drain the node drainReq := &structs.NodeUpdateDrainRequest{ @@ -354,47 +286,31 @@ func TestDrainer_DrainEmptyNode(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Check that the node drain is removed - state := s1.State() - testutil.WaitForResult(func() (bool, error) { - node, err := state.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Check we got the right events - node, err := state.NodeByID(nil, n1.ID) - require.NoError(err) - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, nil, 3, "") } func TestDrainer_AllTypes_Deadline(t *testing.T) { ci.Parallel(t) - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() - // Create two nodes, registering the second later - n1, n2 := mock.Node(), mock.Node() + // Create a node + n1 := mock.Node() nodeReg := &structs.NodeRegisterRequest{ Node: n1, WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - // Create a service job that runs on just one + // Create a service job that runs on it job := mock.Job() job.TaskGroups[0].Count = 2 req := &structs.JobRegisterRequest{ @@ -404,11 +320,9 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response var resp structs.JobRegisterResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) // Create a system job sysjob := mock.SystemJob() @@ -419,10 +333,8 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) // Create a batch job bjob := mock.BatchJob() @@ -434,31 +346,21 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) { Namespace: job.Namespace, }, } + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) - // Fetch the response - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + // Wait for all the allocations to be placed + waitForPlacedAllocs(t, store, n1.ID, 5) - // Wait for the allocations to be placed - state := s1.State() - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Create the second node + // Create a second node + n2 := mock.Node() nodeReg = &structs.NodeRegisterRequest{ Node: n2, WriteRequest: structs.WriteRequest{Region: "global"}, } - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - // Drain the node + // Drain the first node drainReq := &structs.NodeUpdateDrainRequest{ NodeID: n1.ID, DrainStrategy: &structs.DrainStrategy{ @@ -469,58 +371,18 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Wait for the allocs to be replaced + // Setup client simulator errCh := make(chan error, 2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) - go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger) - // Wait for the allocs to be stopped - var finalAllocs []*structs.Allocation - testutil.WaitForResult(func() (bool, error) { - if err := checkAllocPromoter(errCh); err != nil { - return false, err - } - - var err error - finalAllocs, err = state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - for _, alloc := range finalAllocs { - if alloc.DesiredStatus != structs.AllocDesiredStatusStop { - return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus) - } - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Check that the node drain is removed - testutil.WaitForResult(func() (bool, error) { - node, err := state.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Wait for the allocations to be placed on the other node - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n2.ID) - if err != nil { - return false, err - } - return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + // Wait for allocs to be replaced + finalAllocs := waitForAllocsStop(t, store, n1.ID, nil) + waitForPlacedAllocs(t, store, n2.ID, 5) // Assert that the service finished before the batch and system var serviceMax, batchMax uint64 = 0, 0 @@ -531,37 +393,32 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) { batchMax = alloc.ModifyIndex } } - require.True(serviceMax < batchMax) + must.Less(t, batchMax, serviceMax) - // Check we got the right events - node, err := state.NodeByID(nil, n1.ID) - require.NoError(err) - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) - require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, nil, 3, drainer.NodeDrainEventDetailDeadlined) } // Test that drain is unset when batch jobs naturally finish func TestDrainer_AllTypes_NoDeadline(t *testing.T) { ci.Parallel(t) - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() // Create two nodes, registering the second later - n1, n2 := mock.Node(), mock.Node() + n1 := mock.Node() nodeReg := &structs.NodeRegisterRequest{ Node: n1, WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - // Create a service job that runs on just one + // Create a service job job := mock.Job() job.TaskGroups[0].Count = 2 req := &structs.JobRegisterRequest{ @@ -571,11 +428,9 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response var resp structs.JobRegisterResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) // Create a system job sysjob := mock.SystemJob() @@ -586,10 +441,8 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) // Create a batch job bjob := mock.BatchJob() @@ -601,31 +454,21 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { Namespace: job.Namespace, }, } + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) - // Fetch the response - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + // Wait for all the allocations to be placed + waitForPlacedAllocs(t, store, n1.ID, 5) - // Wait for the allocations to be placed - state := s1.State() - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Create the second node + // Create a second node + n2 := mock.Node() nodeReg = &structs.NodeRegisterRequest{ Node: n2, WriteRequest: structs.WriteRequest{Region: "global"}, } - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - // Drain the node + // Drain the first node drainReq := &structs.NodeUpdateDrainRequest{ NodeID: n1.ID, DrainStrategy: &structs.DrainStrategy{ @@ -636,40 +479,36 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Wait for the allocs to be replaced + // Setup client simulator errCh := make(chan error, 2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) - go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger) - // Wait for the service allocs to be stopped on the draining node - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByJob(nil, job.Namespace, job.ID, false) - if err != nil { - return false, err - } + // Wait for the service allocs (only) to be stopped on the draining node + must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error { + allocs, err := store.AllocsByJob(nil, job.Namespace, job.ID, false) + must.NoError(t, err) for _, alloc := range allocs { if alloc.NodeID != n1.ID { continue } if alloc.DesiredStatus != structs.AllocDesiredStatusStop { - return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus) + return fmt.Errorf("got desired status %v", alloc.DesiredStatus) } } - if err := checkAllocPromoter(errCh); err != nil { - return false, err - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) + return checkAllocPromoter(errCh) + }), + wait.Timeout(10*time.Second), + wait.Gap(100*time.Millisecond), + )) // Mark the batch allocations as finished - allocs, err := state.AllocsByJob(nil, job.Namespace, bjob.ID, false) - require.Nil(err) + allocs, err := store.AllocsByJob(nil, job.Namespace, bjob.ID, false) + must.NoError(t, err) var updates []*structs.Allocation for _, alloc := range allocs { @@ -677,56 +516,34 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { new.ClientStatus = structs.AllocClientStatusComplete updates = append(updates, new) } - require.Nil(state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1000, updates)) + index, _ := store.LatestIndex() + index++ + must.NoError(t, store.UpdateAllocsFromClient(structs.MsgTypeTestSetup, index, updates)) - // Check that the node drain is removed - testutil.WaitForResult(func() (bool, error) { - node, err := state.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - t.Fatalf("err: %v", err) - }) + // Wait for the service allocations to be replaced + waitForPlacedAllocs(t, store, n2.ID, 3) - // Wait for the service allocations to be placed on the other node - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n2.ID) - if err != nil { - return false, err - } - return len(allocs) == 3, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Check we got the right events - node, err := state.NodeByID(nil, n1.ID) - require.NoError(err) - - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, "") } func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { ci.Parallel(t) - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() - // Create two nodes, registering the second later - n1, n2 := mock.Node(), mock.Node() + // Create a node + n1 := mock.Node() nodeReg := &structs.NodeRegisterRequest{ Node: n1, WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) // Create a service job that runs on just one job := mock.Job() @@ -738,11 +555,9 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response var resp structs.JobRegisterResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) job.CreateIndex = resp.JobModifyIndex // Create a system job @@ -754,10 +569,8 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) sysjob.CreateIndex = resp.JobModifyIndex // Create a batch job @@ -770,23 +583,12 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) bjob.CreateIndex = resp.JobModifyIndex // Wait for the allocations to be placed - state := s1.State() - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + waitForPlacedAllocs(t, store, n1.ID, 5) // Create some old terminal allocs for each job that point at a non-existent // node to simulate it being on a GC'd node. @@ -802,16 +604,17 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { alloc.ClientStatus = structs.AllocClientStatusComplete badAllocs = append(badAllocs, alloc) } - require.NoError(state.UpsertAllocs(structs.MsgTypeTestSetup, 1, badAllocs)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 1, badAllocs)) // Create the second node + n2 := mock.Node() nodeReg = &structs.NodeRegisterRequest{ Node: n2, WriteRequest: structs.WriteRequest{Region: "global"}, } - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - // Drain the node + // Drain the first node drainReq := &structs.NodeUpdateDrainRequest{ NodeID: n1.ID, DrainStrategy: &structs.DrainStrategy{ @@ -822,99 +625,54 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Wait for the allocs to be replaced + // Setup client simulator errCh := make(chan error, 2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) - go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger) - // Wait for the allocs to be stopped - var finalAllocs []*structs.Allocation - testutil.WaitForResult(func() (bool, error) { - if err := checkAllocPromoter(errCh); err != nil { - return false, err - } + // Wait for the allocs to be replaced + waitForAllocsStop(t, store, n1.ID, errCh) + waitForPlacedAllocs(t, store, n2.ID, 5) - var err error - finalAllocs, err = state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - for _, alloc := range finalAllocs { - if alloc.DesiredStatus != structs.AllocDesiredStatusStop { - return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus) - } - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Check that the node drain is removed - testutil.WaitForResult(func() (bool, error) { - node, err := state.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Wait for the allocations to be placed on the other node - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n2.ID) - if err != nil { - return false, err - } - return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Check we got the right events - node, err := state.NodeByID(nil, n1.ID) - require.NoError(err) - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) - require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, drainer.NodeDrainEventDetailDeadlined) } // TestDrainer_MultipleNSes_ServiceOnly asserts that all jobs on an alloc, even // when they belong to different namespaces and share the same ID func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) { ci.Parallel(t) - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() - // Create two nodes - n1, n2 := mock.Node(), mock.Node() + // Create a node + n1 := mock.Node() nodeReg := &structs.NodeRegisterRequest{ Node: n1, WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.NoError(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - ns1, ns2 := mock.Namespace(), mock.Namespace() - nses := []*structs.Namespace{ns1, ns2} + nsrv, ns2 := mock.Namespace(), mock.Namespace() + nses := []*structs.Namespace{nsrv, ns2} nsReg := &structs.NamespaceUpsertRequest{ Namespaces: nses, WriteRequest: structs.WriteRequest{Region: "global"}, } var nsResp structs.GenericResponse - require.NoError(msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", nsReg, &nsResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", nsReg, &nsResp)) for _, ns := range nses { - // Create a job that runs on just one + // Create a job for each namespace job := mock.Job() job.ID = "example" job.Name = "example" @@ -930,36 +688,20 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) { // Fetch the response var resp structs.JobRegisterResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) } // Wait for the two allocations to be placed - store := s1.State() - testutil.WaitForResult(func() (bool, error) { - iter, err := store.Allocs(nil, state.SortDefault) - if err != nil { - return false, err - } - - count := 0 - for iter.Next() != nil { - count++ - } - if count != 2 { - return false, fmt.Errorf("expected %d allocs, found %d", 2, count) - } - return true, nil - }, func(err error) { - require.NoError(err) - }) + waitForPlacedAllocs(t, store, n1.ID, 2) // Create the second node + n2 := mock.Node() nodeReg = &structs.NodeRegisterRequest{ Node: n2, WriteRequest: structs.WriteRequest{Region: "global"}, } - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) // Drain the first node drainReq := &structs.NodeUpdateDrainRequest{ @@ -972,45 +714,21 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Wait for the allocs to be replaced + // Setup client simulator errCh := make(chan error, 2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(errCh, ctx, store, codec, n1.ID, s1.logger) - go allocPromoter(errCh, ctx, store, codec, n2.ID, s1.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger) - testutil.WaitForResult(func() (bool, error) { - allocs, err := store.AllocsByNode(nil, n2.ID) - if err != nil { - return false, err - } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - require.NoError(err) - }) + // Wait for the allocs to be replaced + waitForAllocsStop(t, store, n1.ID, errCh) + waitForPlacedAllocs(t, store, n2.ID, 2) - // Check that the node drain is removed - testutil.WaitForResult(func() (bool, error) { - if err := checkAllocPromoter(errCh); err != nil { - return false, err - } - node, err := store.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - require.NoError(err) - }) - - // Check we got the right events - node, err := store.NodeByID(nil, n1.ID) - require.NoError(err) - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, "") } // Test that transitions to force drain work. @@ -1023,11 +741,11 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) { name = "Deadline" } t.Run(name, func(t *testing.T) { - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() // Create a node n1 := mock.Node() @@ -1036,7 +754,7 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) // Create a batch job bjob := mock.BatchJob() @@ -1051,20 +769,11 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) { // Fetch the response var resp structs.JobRegisterResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) // Wait for the allocations to be placed - state := s1.State() - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + waitForPlacedAllocs(t, store, n1.ID, 2) // Pick the deadline deadline := 0 * time.Second @@ -1083,35 +792,37 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Wait for the allocs to be replaced + // Setup client simulator errCh := make(chan error, 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger) // Make sure the batch job isn't affected - testutil.AssertUntil(500*time.Millisecond, func() (bool, error) { + must.Wait(t, wait.ContinualSuccess(wait.ErrorFunc(func() error { if err := checkAllocPromoter(errCh); err != nil { - return false, fmt.Errorf("check alloc promoter error: %v", err) + return fmt.Errorf("check alloc promoter error: %v", err) } - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, fmt.Errorf("AllocsByNode error: %v", err) - } + allocs, err := store.AllocsByNode(nil, n1.ID) + must.NoError(t, err) for _, alloc := range allocs { if alloc.DesiredStatus != structs.AllocDesiredStatusRun { - return false, fmt.Errorf("got status %v", alloc.DesiredStatus) + return fmt.Errorf("got status %v", alloc.DesiredStatus) } } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + if len(allocs) != 2 { + return fmt.Errorf("expected 2 allocs but got %d", len(allocs)) + } + return nil + }), + wait.Timeout(500*time.Millisecond), + wait.Gap(50*time.Millisecond), + )) - // Foce drain the node + // Force drain the node drainReq = &structs.NodeUpdateDrainRequest{ NodeID: n1.ID, DrainStrategy: &structs.DrainStrategy{ @@ -1121,42 +832,91 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) { }, WriteRequest: structs.WriteRequest{Region: "global"}, } - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec( + codec, "Node.UpdateDrain", drainReq, &drainResp)) // Make sure the batch job is migrated - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - for _, alloc := range allocs { - if alloc.DesiredStatus != structs.AllocDesiredStatusStop { - return false, fmt.Errorf("got status %v", alloc.DesiredStatus) - } - } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + waitForAllocsStop(t, store, n1.ID, errCh) - // Check that the node drain is removed - testutil.WaitForResult(func() (bool, error) { - node, err := state.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - t.Fatalf("err: %v", err) - }) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, errCh, 4, + drainer.NodeDrainEventDetailDeadlined) - // Check we got the right events - node, err := state.NodeByID(nil, n1.ID) - require.NoError(err) - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 4, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[3].Message) - require.Contains(node.Events[3].Details, drainer.NodeDrainEventDetailDeadlined) }) } } + +// waitForNodeDrainComplete is a test helper that verifies the node drain has +// been removed and that the expected Node events have been written +func waitForNodeDrainComplete(t *testing.T, store *state.StateStore, nodeID string, + errCh chan error, expectEvents int, expectDetail string) { + t.Helper() + + var node *structs.Node + + must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error { + if err := checkAllocPromoter(errCh); err != nil { + return err + } + node, _ = store.NodeByID(nil, nodeID) + if node.DrainStrategy != nil { + return fmt.Errorf("has drain strategy still set") + } + // sometimes test gets a duplicate node drain complete event + if len(node.Events) < expectEvents { + return fmt.Errorf( + "did not get enough events (expected %d): %v", expectEvents, node.Events) + } + return nil + }), + wait.Timeout(10*time.Second), + wait.Gap(50*time.Millisecond), + )) + + must.Eq(t, drainer.NodeDrainEventComplete, node.Events[expectEvents-1].Message) + if expectDetail != "" { + must.MapContainsKey(t, node.Events[expectEvents-1].Details, expectDetail, + must.Sprintf("%#v", node.Events[expectEvents-1].Details), + ) + } +} + +func waitForPlacedAllocs(t *testing.T, store *state.StateStore, nodeID string, count int) { + t.Helper() + must.Wait(t, wait.InitialSuccess( + wait.BoolFunc(func() bool { + allocs, err := store.AllocsByNode(nil, nodeID) + must.NoError(t, err) + return len(allocs) == count + }), + wait.Timeout(10*time.Second), + wait.Gap(50*time.Millisecond), + )) +} + +// waitForAllocsStop waits for all allocs on the node to be stopped +func waitForAllocsStop(t *testing.T, store *state.StateStore, nodeID string, errCh chan error) []*structs.Allocation { + t.Helper() + var finalAllocs []*structs.Allocation + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + if err := checkAllocPromoter(errCh); err != nil { + return err + } + + var err error + finalAllocs, err = store.AllocsByNode(nil, nodeID) + must.NoError(t, err) + for _, alloc := range finalAllocs { + if alloc.DesiredStatus != structs.AllocDesiredStatusStop { + return fmt.Errorf("expected stop but got %s", alloc.DesiredStatus) + } + } + return nil + }), + wait.Timeout(10*time.Second), + wait.Gap(50*time.Millisecond), + )) + + return finalAllocs +}