From 80bd521631d9f54dcaaa5317ff5039ed787c6374 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 11 Apr 2023 14:31:13 -0400 Subject: [PATCH] drainer: fix codec race condition in integration test (#16845) msgpackrpc codec handles are specific to a connection and cannot be shared between goroutines; this can cause corrupted decoding. Fix the drainer integration test so that we create separate codecs for the goroutines that the test helper spins up to simulate client updates. This changeset also refactors the drainer integration test to bring it up to current idioms and library usages, make assertions more clear, and reduce duplication. --- nomad/drainer_int_test.go | 850 ++++++++++++++------------------------ 1 file changed, 305 insertions(+), 545 deletions(-) diff --git a/nomad/drainer_int_test.go b/nomad/drainer_int_test.go index a9162c540..76d70e7de 100644 --- a/nomad/drainer_int_test.go +++ b/nomad/drainer_int_test.go @@ -6,15 +6,16 @@ package nomad import ( "context" "fmt" - "net/rpc" "testing" "time" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" - "github.com/hashicorp/nomad/ci" + "github.com/shoenig/test/must" + "github.com/shoenig/test/wait" + "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper/pointer" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/drainer" @@ -22,16 +23,21 @@ import ( "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" - "github.com/stretchr/testify/require" ) -func allocPromoter(errCh chan<- error, ctx context.Context, - state *state.StateStore, codec rpc.ClientCodec, nodeID string, - logger log.Logger) { +// allocClientStateSimulator simulates the updates in state from the +// client. allocations that are new on the server get marked with healthy +// deployments, and allocations that are DesiredStatus=stop on the server get +// updates with terminal client status. +func allocClientStateSimulator(t *testing.T, errCh chan<- error, ctx context.Context, + srv *Server, nodeID string, logger log.Logger) { + + codec := rpcClient(t, srv) + store := srv.State() nindex := uint64(1) for { - allocs, index, err := getNodeAllocs(ctx, state, nodeID, nindex) + allocs, index, err := getNodeAllocs(ctx, store, nodeID, nindex) if err != nil { if err == context.Canceled { return @@ -51,7 +57,7 @@ func allocPromoter(errCh chan<- error, ctx context.Context, } if alloc.DeploymentStatus.HasHealth() { - continue + continue // only update to healthy once } newAlloc := alloc.Copy() newAlloc.DeploymentStatus = &structs.AllocDeploymentStatus{ @@ -59,7 +65,7 @@ func allocPromoter(errCh chan<- error, ctx context.Context, Timestamp: now, } updates = append(updates, newAlloc) - logger.Trace("marked deployment health for alloc", "alloc_id", alloc.ID) + logger.Trace("marking deployment health for alloc", "alloc_id", alloc.ID) } if len(updates) == 0 { @@ -83,7 +89,7 @@ func allocPromoter(errCh chan<- error, ctx context.Context, } // checkAllocPromoter is a small helper to return an error or nil from an error -// chan like the one given to the allocPromoter goroutine. +// chan like the one given to the allocClientStateSimulator goroutine. func checkAllocPromoter(errCh chan error) error { select { case err := <-errCh: @@ -93,8 +99,8 @@ func checkAllocPromoter(errCh chan error) error { } } -func getNodeAllocs(ctx context.Context, state *state.StateStore, nodeID string, index uint64) ([]*structs.Allocation, uint64, error) { - resp, index, err := state.BlockingQuery(getNodeAllocsImpl(nodeID), index, ctx) +func getNodeAllocs(ctx context.Context, store *state.StateStore, nodeID string, index uint64) ([]*structs.Allocation, uint64, error) { + resp, index, err := store.BlockingQuery(getNodeAllocsImpl(nodeID), index, ctx) if err != nil { return nil, 0, err } @@ -105,16 +111,16 @@ func getNodeAllocs(ctx context.Context, state *state.StateStore, nodeID string, return resp.([]*structs.Allocation), index, nil } -func getNodeAllocsImpl(nodeID string) func(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { - return func(ws memdb.WatchSet, state *state.StateStore) (interface{}, uint64, error) { +func getNodeAllocsImpl(nodeID string) func(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) { + return func(ws memdb.WatchSet, store *state.StateStore) (interface{}, uint64, error) { // Capture all the allocations - allocs, err := state.AllocsByNode(ws, nodeID) + allocs, err := store.AllocsByNode(ws, nodeID) if err != nil { return nil, 0, err } // Use the last index that affected the jobs table - index, err := state.Index("allocs") + index, err := store.Index("allocs") if err != nil { return nil, index, err } @@ -125,23 +131,23 @@ func getNodeAllocsImpl(nodeID string) func(ws memdb.WatchSet, state *state.State func TestDrainer_Simple_ServiceOnly(t *testing.T) { ci.Parallel(t) - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() - // Create two nodes - n1, n2 := mock.Node(), mock.Node() + // Create a node + n1 := mock.Node() nodeReg := &structs.NodeRegisterRequest{ Node: n1, WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - // Create a job that runs on just one + // Create a job that runs on that node job := mock.Job() job.TaskGroups[0].Count = 2 req := &structs.JobRegisterRequest{ @@ -151,30 +157,20 @@ func TestDrainer_Simple_ServiceOnly(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response var resp structs.JobRegisterResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) // Wait for the two allocations to be placed - state := s1.State() - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByJob(nil, job.Namespace, job.ID, false) - if err != nil { - return false, err - } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + waitForPlacedAllocs(t, store, n1.ID, 2) // Create the second node + n2 := mock.Node() nodeReg = &structs.NodeRegisterRequest{ Node: n2, WriteRequest: structs.WriteRequest{Region: "global"}, } - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) // Drain the first node drainReq := &structs.NodeUpdateDrainRequest{ @@ -187,55 +183,31 @@ func TestDrainer_Simple_ServiceOnly(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Wait for the allocs to be replaced + // Setup client simulator errCh := make(chan error, 2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) - go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger) - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n2.ID) - if err != nil { - return false, err - } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + // Wait for the allocs to be replaced + waitForAllocsStop(t, store, n1.ID, nil) + waitForPlacedAllocs(t, store, n2.ID, 2) - // Check that the node drain is removed - testutil.WaitForResult(func() (bool, error) { - if err := checkAllocPromoter(errCh); err != nil { - return false, err - } - node, err := state.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Check we got the right events - node, err := state.NodeByID(nil, n1.ID) - require.NoError(err) - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, "") } func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) { ci.Parallel(t) - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() // Create a node n1 := mock.Node() @@ -244,9 +216,9 @@ func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - // Create a job that runs on just one + // Create a job that runs on it job := mock.Job() job.Update = *structs.DefaultUpdateStrategy job.Update.Stagger = 30 * time.Second @@ -258,23 +230,12 @@ func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response var resp structs.JobRegisterResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) // Wait for the two allocations to be placed - state := s1.State() - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByJob(nil, job.Namespace, job.ID, false) - if err != nil { - return false, err - } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + waitForPlacedAllocs(t, store, n1.ID, 2) // Drain the node drainReq := &structs.NodeUpdateDrainRequest{ @@ -287,61 +248,32 @@ func TestDrainer_Simple_ServiceOnly_Deadline(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Wait for the allocs to be stopped - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - for _, alloc := range allocs { - if alloc.DesiredStatus != structs.AllocDesiredStatusStop { - return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus) - } - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) + // Wait for the allocs to be stopped (but not replaced) + waitForAllocsStop(t, store, n1.ID, nil) - // Check that the node drain is removed - testutil.WaitForResult(func() (bool, error) { - node, err := state.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Check we got the right events - node, err := state.NodeByID(nil, n1.ID) - require.NoError(err) - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) - require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, nil, 3, drainer.NodeDrainEventDetailDeadlined) } func TestDrainer_DrainEmptyNode(t *testing.T) { ci.Parallel(t) - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() - // Create a node + // Create an empty node n1 := mock.Node() nodeReg := &structs.NodeRegisterRequest{ Node: n1, WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) // Drain the node drainReq := &structs.NodeUpdateDrainRequest{ @@ -354,47 +286,31 @@ func TestDrainer_DrainEmptyNode(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Check that the node drain is removed - state := s1.State() - testutil.WaitForResult(func() (bool, error) { - node, err := state.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Check we got the right events - node, err := state.NodeByID(nil, n1.ID) - require.NoError(err) - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, nil, 3, "") } func TestDrainer_AllTypes_Deadline(t *testing.T) { ci.Parallel(t) - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() - // Create two nodes, registering the second later - n1, n2 := mock.Node(), mock.Node() + // Create a node + n1 := mock.Node() nodeReg := &structs.NodeRegisterRequest{ Node: n1, WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - // Create a service job that runs on just one + // Create a service job that runs on it job := mock.Job() job.TaskGroups[0].Count = 2 req := &structs.JobRegisterRequest{ @@ -404,11 +320,9 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response var resp structs.JobRegisterResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) // Create a system job sysjob := mock.SystemJob() @@ -419,10 +333,8 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) // Create a batch job bjob := mock.BatchJob() @@ -434,31 +346,21 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) { Namespace: job.Namespace, }, } + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) - // Fetch the response - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + // Wait for all the allocations to be placed + waitForPlacedAllocs(t, store, n1.ID, 5) - // Wait for the allocations to be placed - state := s1.State() - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Create the second node + // Create a second node + n2 := mock.Node() nodeReg = &structs.NodeRegisterRequest{ Node: n2, WriteRequest: structs.WriteRequest{Region: "global"}, } - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - // Drain the node + // Drain the first node drainReq := &structs.NodeUpdateDrainRequest{ NodeID: n1.ID, DrainStrategy: &structs.DrainStrategy{ @@ -469,58 +371,18 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Wait for the allocs to be replaced + // Setup client simulator errCh := make(chan error, 2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) - go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger) - // Wait for the allocs to be stopped - var finalAllocs []*structs.Allocation - testutil.WaitForResult(func() (bool, error) { - if err := checkAllocPromoter(errCh); err != nil { - return false, err - } - - var err error - finalAllocs, err = state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - for _, alloc := range finalAllocs { - if alloc.DesiredStatus != structs.AllocDesiredStatusStop { - return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus) - } - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Check that the node drain is removed - testutil.WaitForResult(func() (bool, error) { - node, err := state.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Wait for the allocations to be placed on the other node - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n2.ID) - if err != nil { - return false, err - } - return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + // Wait for allocs to be replaced + finalAllocs := waitForAllocsStop(t, store, n1.ID, nil) + waitForPlacedAllocs(t, store, n2.ID, 5) // Assert that the service finished before the batch and system var serviceMax, batchMax uint64 = 0, 0 @@ -531,37 +393,32 @@ func TestDrainer_AllTypes_Deadline(t *testing.T) { batchMax = alloc.ModifyIndex } } - require.True(serviceMax < batchMax) + must.Less(t, batchMax, serviceMax) - // Check we got the right events - node, err := state.NodeByID(nil, n1.ID) - require.NoError(err) - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) - require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, nil, 3, drainer.NodeDrainEventDetailDeadlined) } // Test that drain is unset when batch jobs naturally finish func TestDrainer_AllTypes_NoDeadline(t *testing.T) { ci.Parallel(t) - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() // Create two nodes, registering the second later - n1, n2 := mock.Node(), mock.Node() + n1 := mock.Node() nodeReg := &structs.NodeRegisterRequest{ Node: n1, WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - // Create a service job that runs on just one + // Create a service job job := mock.Job() job.TaskGroups[0].Count = 2 req := &structs.JobRegisterRequest{ @@ -571,11 +428,9 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response var resp structs.JobRegisterResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) // Create a system job sysjob := mock.SystemJob() @@ -586,10 +441,8 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) // Create a batch job bjob := mock.BatchJob() @@ -601,31 +454,21 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { Namespace: job.Namespace, }, } + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) - // Fetch the response - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + // Wait for all the allocations to be placed + waitForPlacedAllocs(t, store, n1.ID, 5) - // Wait for the allocations to be placed - state := s1.State() - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Create the second node + // Create a second node + n2 := mock.Node() nodeReg = &structs.NodeRegisterRequest{ Node: n2, WriteRequest: structs.WriteRequest{Region: "global"}, } - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - // Drain the node + // Drain the first node drainReq := &structs.NodeUpdateDrainRequest{ NodeID: n1.ID, DrainStrategy: &structs.DrainStrategy{ @@ -636,40 +479,36 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Wait for the allocs to be replaced + // Setup client simulator errCh := make(chan error, 2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) - go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger) - // Wait for the service allocs to be stopped on the draining node - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByJob(nil, job.Namespace, job.ID, false) - if err != nil { - return false, err - } + // Wait for the service allocs (only) to be stopped on the draining node + must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error { + allocs, err := store.AllocsByJob(nil, job.Namespace, job.ID, false) + must.NoError(t, err) for _, alloc := range allocs { if alloc.NodeID != n1.ID { continue } if alloc.DesiredStatus != structs.AllocDesiredStatusStop { - return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus) + return fmt.Errorf("got desired status %v", alloc.DesiredStatus) } } - if err := checkAllocPromoter(errCh); err != nil { - return false, err - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) + return checkAllocPromoter(errCh) + }), + wait.Timeout(10*time.Second), + wait.Gap(100*time.Millisecond), + )) // Mark the batch allocations as finished - allocs, err := state.AllocsByJob(nil, job.Namespace, bjob.ID, false) - require.Nil(err) + allocs, err := store.AllocsByJob(nil, job.Namespace, bjob.ID, false) + must.NoError(t, err) var updates []*structs.Allocation for _, alloc := range allocs { @@ -677,56 +516,34 @@ func TestDrainer_AllTypes_NoDeadline(t *testing.T) { new.ClientStatus = structs.AllocClientStatusComplete updates = append(updates, new) } - require.Nil(state.UpdateAllocsFromClient(structs.MsgTypeTestSetup, 1000, updates)) + index, _ := store.LatestIndex() + index++ + must.NoError(t, store.UpdateAllocsFromClient(structs.MsgTypeTestSetup, index, updates)) - // Check that the node drain is removed - testutil.WaitForResult(func() (bool, error) { - node, err := state.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - t.Fatalf("err: %v", err) - }) + // Wait for the service allocations to be replaced + waitForPlacedAllocs(t, store, n2.ID, 3) - // Wait for the service allocations to be placed on the other node - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n2.ID) - if err != nil { - return false, err - } - return len(allocs) == 3, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Check we got the right events - node, err := state.NodeByID(nil, n1.ID) - require.NoError(err) - - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, "") } func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { ci.Parallel(t) - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() - // Create two nodes, registering the second later - n1, n2 := mock.Node(), mock.Node() + // Create a node + n1 := mock.Node() nodeReg := &structs.NodeRegisterRequest{ Node: n1, WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) // Create a service job that runs on just one job := mock.Job() @@ -738,11 +555,9 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response var resp structs.JobRegisterResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) job.CreateIndex = resp.JobModifyIndex // Create a system job @@ -754,10 +569,8 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) sysjob.CreateIndex = resp.JobModifyIndex // Create a batch job @@ -770,23 +583,12 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { Namespace: job.Namespace, }, } - - // Fetch the response - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) bjob.CreateIndex = resp.JobModifyIndex // Wait for the allocations to be placed - state := s1.State() - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + waitForPlacedAllocs(t, store, n1.ID, 5) // Create some old terminal allocs for each job that point at a non-existent // node to simulate it being on a GC'd node. @@ -802,16 +604,17 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { alloc.ClientStatus = structs.AllocClientStatusComplete badAllocs = append(badAllocs, alloc) } - require.NoError(state.UpsertAllocs(structs.MsgTypeTestSetup, 1, badAllocs)) + must.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, 1, badAllocs)) // Create the second node + n2 := mock.Node() nodeReg = &structs.NodeRegisterRequest{ Node: n2, WriteRequest: structs.WriteRequest{Region: "global"}, } - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - // Drain the node + // Drain the first node drainReq := &structs.NodeUpdateDrainRequest{ NodeID: n1.ID, DrainStrategy: &structs.DrainStrategy{ @@ -822,99 +625,54 @@ func TestDrainer_AllTypes_Deadline_GarbageCollectedNode(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Wait for the allocs to be replaced + // Setup client simulator errCh := make(chan error, 2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) - go allocPromoter(errCh, ctx, state, codec, n2.ID, s1.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger) - // Wait for the allocs to be stopped - var finalAllocs []*structs.Allocation - testutil.WaitForResult(func() (bool, error) { - if err := checkAllocPromoter(errCh); err != nil { - return false, err - } + // Wait for the allocs to be replaced + waitForAllocsStop(t, store, n1.ID, errCh) + waitForPlacedAllocs(t, store, n2.ID, 5) - var err error - finalAllocs, err = state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - for _, alloc := range finalAllocs { - if alloc.DesiredStatus != structs.AllocDesiredStatusStop { - return false, fmt.Errorf("got desired status %v", alloc.DesiredStatus) - } - } - return true, nil - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Check that the node drain is removed - testutil.WaitForResult(func() (bool, error) { - node, err := state.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Wait for the allocations to be placed on the other node - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n2.ID) - if err != nil { - return false, err - } - return len(allocs) == 5, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) - - // Check we got the right events - node, err := state.NodeByID(nil, n1.ID) - require.NoError(err) - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) - require.Contains(node.Events[2].Details, drainer.NodeDrainEventDetailDeadlined) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, drainer.NodeDrainEventDetailDeadlined) } // TestDrainer_MultipleNSes_ServiceOnly asserts that all jobs on an alloc, even // when they belong to different namespaces and share the same ID func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) { ci.Parallel(t) - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() - // Create two nodes - n1, n2 := mock.Node(), mock.Node() + // Create a node + n1 := mock.Node() nodeReg := &structs.NodeRegisterRequest{ Node: n1, WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.NoError(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) - ns1, ns2 := mock.Namespace(), mock.Namespace() - nses := []*structs.Namespace{ns1, ns2} + nsrv, ns2 := mock.Namespace(), mock.Namespace() + nses := []*structs.Namespace{nsrv, ns2} nsReg := &structs.NamespaceUpsertRequest{ Namespaces: nses, WriteRequest: structs.WriteRequest{Region: "global"}, } var nsResp structs.GenericResponse - require.NoError(msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", nsReg, &nsResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Namespace.UpsertNamespaces", nsReg, &nsResp)) for _, ns := range nses { - // Create a job that runs on just one + // Create a job for each namespace job := mock.Job() job.ID = "example" job.Name = "example" @@ -930,36 +688,20 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) { // Fetch the response var resp structs.JobRegisterResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) } // Wait for the two allocations to be placed - store := s1.State() - testutil.WaitForResult(func() (bool, error) { - iter, err := store.Allocs(nil, state.SortDefault) - if err != nil { - return false, err - } - - count := 0 - for iter.Next() != nil { - count++ - } - if count != 2 { - return false, fmt.Errorf("expected %d allocs, found %d", 2, count) - } - return true, nil - }, func(err error) { - require.NoError(err) - }) + waitForPlacedAllocs(t, store, n1.ID, 2) // Create the second node + n2 := mock.Node() nodeReg = &structs.NodeRegisterRequest{ Node: n2, WriteRequest: structs.WriteRequest{Region: "global"}, } - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) // Drain the first node drainReq := &structs.NodeUpdateDrainRequest{ @@ -972,45 +714,21 @@ func TestDrainer_MultipleNSes_ServiceOnly(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Wait for the allocs to be replaced + // Setup client simulator errCh := make(chan error, 2) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(errCh, ctx, store, codec, n1.ID, s1.logger) - go allocPromoter(errCh, ctx, store, codec, n2.ID, s1.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n2.ID, srv.logger) - testutil.WaitForResult(func() (bool, error) { - allocs, err := store.AllocsByNode(nil, n2.ID) - if err != nil { - return false, err - } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - require.NoError(err) - }) + // Wait for the allocs to be replaced + waitForAllocsStop(t, store, n1.ID, errCh) + waitForPlacedAllocs(t, store, n2.ID, 2) - // Check that the node drain is removed - testutil.WaitForResult(func() (bool, error) { - if err := checkAllocPromoter(errCh); err != nil { - return false, err - } - node, err := store.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - require.NoError(err) - }) - - // Check we got the right events - node, err := store.NodeByID(nil, n1.ID) - require.NoError(err) - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 3, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[2].Message) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, errCh, 3, "") } // Test that transitions to force drain work. @@ -1023,11 +741,11 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) { name = "Deadline" } t.Run(name, func(t *testing.T) { - require := require.New(t) - s1, cleanupS1 := TestServer(t, nil) - defer cleanupS1() - codec := rpcClient(t, s1) - testutil.WaitForLeader(t, s1.RPC) + srv, cleanupSrv := TestServer(t, nil) + defer cleanupSrv() + codec := rpcClient(t, srv) + testutil.WaitForLeader(t, srv.RPC) + store := srv.State() // Create a node n1 := mock.Node() @@ -1036,7 +754,7 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var nodeResp structs.NodeUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.Register", nodeReg, &nodeResp)) // Create a batch job bjob := mock.BatchJob() @@ -1051,20 +769,11 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) { // Fetch the response var resp structs.JobRegisterResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) - require.NotZero(resp.Index) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Job.Register", req, &resp)) + must.Positive(t, resp.Index) // Wait for the allocations to be placed - state := s1.State() - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + waitForPlacedAllocs(t, store, n1.ID, 2) // Pick the deadline deadline := 0 * time.Second @@ -1083,35 +792,37 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) { WriteRequest: structs.WriteRequest{Region: "global"}, } var drainResp structs.NodeDrainUpdateResponse - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) - // Wait for the allocs to be replaced + // Setup client simulator errCh := make(chan error, 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go allocPromoter(errCh, ctx, state, codec, n1.ID, s1.logger) + go allocClientStateSimulator(t, errCh, ctx, srv, n1.ID, srv.logger) // Make sure the batch job isn't affected - testutil.AssertUntil(500*time.Millisecond, func() (bool, error) { + must.Wait(t, wait.ContinualSuccess(wait.ErrorFunc(func() error { if err := checkAllocPromoter(errCh); err != nil { - return false, fmt.Errorf("check alloc promoter error: %v", err) + return fmt.Errorf("check alloc promoter error: %v", err) } - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, fmt.Errorf("AllocsByNode error: %v", err) - } + allocs, err := store.AllocsByNode(nil, n1.ID) + must.NoError(t, err) for _, alloc := range allocs { if alloc.DesiredStatus != structs.AllocDesiredStatusRun { - return false, fmt.Errorf("got status %v", alloc.DesiredStatus) + return fmt.Errorf("got status %v", alloc.DesiredStatus) } } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + if len(allocs) != 2 { + return fmt.Errorf("expected 2 allocs but got %d", len(allocs)) + } + return nil + }), + wait.Timeout(500*time.Millisecond), + wait.Gap(50*time.Millisecond), + )) - // Foce drain the node + // Force drain the node drainReq = &structs.NodeUpdateDrainRequest{ NodeID: n1.ID, DrainStrategy: &structs.DrainStrategy{ @@ -1121,42 +832,91 @@ func TestDrainer_Batch_TransitionToForce(t *testing.T) { }, WriteRequest: structs.WriteRequest{Region: "global"}, } - require.Nil(msgpackrpc.CallWithCodec(codec, "Node.UpdateDrain", drainReq, &drainResp)) + must.NoError(t, msgpackrpc.CallWithCodec( + codec, "Node.UpdateDrain", drainReq, &drainResp)) // Make sure the batch job is migrated - testutil.WaitForResult(func() (bool, error) { - allocs, err := state.AllocsByNode(nil, n1.ID) - if err != nil { - return false, err - } - for _, alloc := range allocs { - if alloc.DesiredStatus != structs.AllocDesiredStatusStop { - return false, fmt.Errorf("got status %v", alloc.DesiredStatus) - } - } - return len(allocs) == 2, fmt.Errorf("got %d allocs", len(allocs)) - }, func(err error) { - t.Fatalf("err: %v", err) - }) + waitForAllocsStop(t, store, n1.ID, errCh) - // Check that the node drain is removed - testutil.WaitForResult(func() (bool, error) { - node, err := state.NodeByID(nil, n1.ID) - if err != nil { - return false, err - } - return node.DrainStrategy == nil, fmt.Errorf("has drain strategy still set") - }, func(err error) { - t.Fatalf("err: %v", err) - }) + // Wait for the node drain to be marked complete with the events we expect + waitForNodeDrainComplete(t, store, n1.ID, errCh, 4, + drainer.NodeDrainEventDetailDeadlined) - // Check we got the right events - node, err := state.NodeByID(nil, n1.ID) - require.NoError(err) - // sometimes test gets a duplicate node drain complete event - require.GreaterOrEqualf(len(node.Events), 4, "unexpected number of events: %v", node.Events) - require.Equal(drainer.NodeDrainEventComplete, node.Events[3].Message) - require.Contains(node.Events[3].Details, drainer.NodeDrainEventDetailDeadlined) }) } } + +// waitForNodeDrainComplete is a test helper that verifies the node drain has +// been removed and that the expected Node events have been written +func waitForNodeDrainComplete(t *testing.T, store *state.StateStore, nodeID string, + errCh chan error, expectEvents int, expectDetail string) { + t.Helper() + + var node *structs.Node + + must.Wait(t, wait.InitialSuccess(wait.ErrorFunc(func() error { + if err := checkAllocPromoter(errCh); err != nil { + return err + } + node, _ = store.NodeByID(nil, nodeID) + if node.DrainStrategy != nil { + return fmt.Errorf("has drain strategy still set") + } + // sometimes test gets a duplicate node drain complete event + if len(node.Events) < expectEvents { + return fmt.Errorf( + "did not get enough events (expected %d): %v", expectEvents, node.Events) + } + return nil + }), + wait.Timeout(10*time.Second), + wait.Gap(50*time.Millisecond), + )) + + must.Eq(t, drainer.NodeDrainEventComplete, node.Events[expectEvents-1].Message) + if expectDetail != "" { + must.MapContainsKey(t, node.Events[expectEvents-1].Details, expectDetail, + must.Sprintf("%#v", node.Events[expectEvents-1].Details), + ) + } +} + +func waitForPlacedAllocs(t *testing.T, store *state.StateStore, nodeID string, count int) { + t.Helper() + must.Wait(t, wait.InitialSuccess( + wait.BoolFunc(func() bool { + allocs, err := store.AllocsByNode(nil, nodeID) + must.NoError(t, err) + return len(allocs) == count + }), + wait.Timeout(10*time.Second), + wait.Gap(50*time.Millisecond), + )) +} + +// waitForAllocsStop waits for all allocs on the node to be stopped +func waitForAllocsStop(t *testing.T, store *state.StateStore, nodeID string, errCh chan error) []*structs.Allocation { + t.Helper() + var finalAllocs []*structs.Allocation + must.Wait(t, wait.InitialSuccess( + wait.ErrorFunc(func() error { + if err := checkAllocPromoter(errCh); err != nil { + return err + } + + var err error + finalAllocs, err = store.AllocsByNode(nil, nodeID) + must.NoError(t, err) + for _, alloc := range finalAllocs { + if alloc.DesiredStatus != structs.AllocDesiredStatusStop { + return fmt.Errorf("expected stop but got %s", alloc.DesiredStatus) + } + } + return nil + }), + wait.Timeout(10*time.Second), + wait.Gap(50*time.Millisecond), + )) + + return finalAllocs +}