diff --git a/nomad/autopilot_test.go b/nomad/autopilot_test.go index a4e2f5d90..23a229b99 100644 --- a/nomad/autopilot_test.go +++ b/nomad/autopilot_test.go @@ -263,11 +263,7 @@ func TestAutopilot_CleanupStaleRaftServer(t *testing.T) { // Join the servers to s1 TestJoin(t, s1, s2, s3) - for _, s := range servers { - retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) }) - } - - testutil.WaitForLeader(t, s1.RPC) + waitForStableLeadership(t, servers) // Add s4 to peers directly addr := fmt.Sprintf("127.0.0.1:%d", s4.config.RPCAddr.Port) diff --git a/nomad/heartbeat_test.go b/nomad/heartbeat_test.go index 0e9e56ffb..c27a377d8 100644 --- a/nomad/heartbeat_test.go +++ b/nomad/heartbeat_test.go @@ -221,28 +221,7 @@ func TestHeartbeat_Server_HeartbeatTTL_Failover(t *testing.T) { servers := []*Server{s1, s2, s3} TestJoin(t, s1, s2, s3) - testutil.WaitForResult(func() (bool, error) { - peers, _ := s1.numPeers() - return peers == 3, nil - }, func(err error) { - t.Fatalf("should have 3 peers") - }) - - // Find the leader - var leader *Server - for _, s := range servers { - // Check that s.heartbeatTimers is empty - if len(s.heartbeatTimers) != 0 { - t.Fatalf("should have no heartbeatTimers") - } - // Find the leader too - if s.IsLeader() { - leader = s - } - } - if leader == nil { - t.Fatalf("Should have a leader") - } + leader := waitForStableLeadership(t, servers) codec := rpcClient(t, leader) // Create the register request diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 19ad39df8..2ffceb053 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -174,25 +174,7 @@ func TestLeader_PlanQueue_Reset(t *testing.T) { servers := []*Server{s1, s2, s3} TestJoin(t, s1, s2, s3) - for _, s := range servers { - testutil.WaitForResult(func() (bool, error) { - peers, _ := s.numPeers() - return peers == 3, nil - }, func(err error) { - t.Fatalf("should have 3 peers") - }) - } - - var leader *Server - for _, s := range servers { - if s.IsLeader() { - leader = s - break - } - } - if leader == nil { - t.Fatalf("Should have a leader") - } + leader := waitForStableLeadership(t, servers) if !leader.planQueue.Enabled() { t.Fatalf("should enable plan queue") @@ -249,27 +231,8 @@ func TestLeader_EvalBroker_Reset(t *testing.T) { defer s3.Shutdown() servers := []*Server{s1, s2, s3} TestJoin(t, s1, s2, s3) - testutil.WaitForLeader(t, s1.RPC) - for _, s := range servers { - testutil.WaitForResult(func() (bool, error) { - peers, _ := s.numPeers() - return peers == 3, nil - }, func(err error) { - t.Fatalf("should have 3 peers") - }) - } - - var leader *Server - for _, s := range servers { - if s.IsLeader() { - leader = s - break - } - } - if leader == nil { - t.Fatalf("Should have a leader") - } + leader := waitForStableLeadership(t, servers) // Inject a pending eval req := structs.EvalUpdateRequest{ @@ -326,27 +289,8 @@ func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) { defer s3.Shutdown() servers := []*Server{s1, s2, s3} TestJoin(t, s1, s2, s3) - testutil.WaitForLeader(t, s1.RPC) - for _, s := range servers { - testutil.WaitForResult(func() (bool, error) { - peers, _ := s.numPeers() - return peers == 3, nil - }, func(err error) { - t.Fatalf("should have 3 peers") - }) - } - - var leader *Server - for _, s := range servers { - if s.IsLeader() { - leader = s - break - } - } - if leader == nil { - t.Fatalf("Should have a leader") - } + leader := waitForStableLeadership(t, servers) // Inject a periodic job, a parameterized periodic job and a non-periodic job periodic := mock.PeriodicJob() @@ -1229,3 +1173,58 @@ func TestServer_ReconcileMember(t *testing.T) { t.Fatalf("got %d server ids want %d", got, want) } } + +// waitForStableLeadership waits until a leader is elected and all servers +// get promoted as voting members, returns the leader +func waitForStableLeadership(t *testing.T, servers []*Server) *Server { + nPeers := len(servers) + + // wait for all servers to discover each other + for _, s := range servers { + testutil.WaitForResult(func() (bool, error) { + peers, _ := s.numPeers() + return peers == 3, fmt.Errorf("should find %d peers but found %d", nPeers, peers) + }, func(err error) { + require.NoError(t, err) + }) + } + + // wait for leader + var leader *Server + testutil.WaitForResult(func() (bool, error) { + for _, s := range servers { + if s.IsLeader() { + leader = s + return true, nil + } + } + + return false, fmt.Errorf("no leader found") + }, func(err error) { + require.NoError(t, err) + }) + + // wait for all servers get marked as voters + testutil.WaitForResult(func() (bool, error) { + future := leader.raft.GetConfiguration() + if err := future.Error(); err != nil { + return false, fmt.Errorf("failed to get raft config: %v", future.Error()) + } + ss := future.Configuration().Servers + if len(ss) != len(servers) { + return false, fmt.Errorf("raft doesn't contain all servers. Expected %d but found %d", len(servers), len(ss)) + } + + for _, s := range ss { + if s.Suffrage != raft.Voter { + return false, fmt.Errorf("configuration has non voting server: %v", s) + } + } + + return true, nil + }, func(err error) { + require.NoError(t, err) + }) + + return leader +}