mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 02:45:42 +03:00
Fix raft tests
Wait until leadership stabalizes and all non-voters get promoted before killing leader
This commit is contained in:
@@ -263,11 +263,7 @@ func TestAutopilot_CleanupStaleRaftServer(t *testing.T) {
|
||||
// Join the servers to s1
|
||||
TestJoin(t, s1, s2, s3)
|
||||
|
||||
for _, s := range servers {
|
||||
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
|
||||
}
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
waitForStableLeadership(t, servers)
|
||||
|
||||
// Add s4 to peers directly
|
||||
addr := fmt.Sprintf("127.0.0.1:%d", s4.config.RPCAddr.Port)
|
||||
|
||||
@@ -221,28 +221,7 @@ func TestHeartbeat_Server_HeartbeatTTL_Failover(t *testing.T) {
|
||||
servers := []*Server{s1, s2, s3}
|
||||
TestJoin(t, s1, s2, s3)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
peers, _ := s1.numPeers()
|
||||
return peers == 3, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 3 peers")
|
||||
})
|
||||
|
||||
// Find the leader
|
||||
var leader *Server
|
||||
for _, s := range servers {
|
||||
// Check that s.heartbeatTimers is empty
|
||||
if len(s.heartbeatTimers) != 0 {
|
||||
t.Fatalf("should have no heartbeatTimers")
|
||||
}
|
||||
// Find the leader too
|
||||
if s.IsLeader() {
|
||||
leader = s
|
||||
}
|
||||
}
|
||||
if leader == nil {
|
||||
t.Fatalf("Should have a leader")
|
||||
}
|
||||
leader := waitForStableLeadership(t, servers)
|
||||
codec := rpcClient(t, leader)
|
||||
|
||||
// Create the register request
|
||||
|
||||
@@ -174,25 +174,7 @@ func TestLeader_PlanQueue_Reset(t *testing.T) {
|
||||
servers := []*Server{s1, s2, s3}
|
||||
TestJoin(t, s1, s2, s3)
|
||||
|
||||
for _, s := range servers {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
peers, _ := s.numPeers()
|
||||
return peers == 3, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 3 peers")
|
||||
})
|
||||
}
|
||||
|
||||
var leader *Server
|
||||
for _, s := range servers {
|
||||
if s.IsLeader() {
|
||||
leader = s
|
||||
break
|
||||
}
|
||||
}
|
||||
if leader == nil {
|
||||
t.Fatalf("Should have a leader")
|
||||
}
|
||||
leader := waitForStableLeadership(t, servers)
|
||||
|
||||
if !leader.planQueue.Enabled() {
|
||||
t.Fatalf("should enable plan queue")
|
||||
@@ -249,27 +231,8 @@ func TestLeader_EvalBroker_Reset(t *testing.T) {
|
||||
defer s3.Shutdown()
|
||||
servers := []*Server{s1, s2, s3}
|
||||
TestJoin(t, s1, s2, s3)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
for _, s := range servers {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
peers, _ := s.numPeers()
|
||||
return peers == 3, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 3 peers")
|
||||
})
|
||||
}
|
||||
|
||||
var leader *Server
|
||||
for _, s := range servers {
|
||||
if s.IsLeader() {
|
||||
leader = s
|
||||
break
|
||||
}
|
||||
}
|
||||
if leader == nil {
|
||||
t.Fatalf("Should have a leader")
|
||||
}
|
||||
leader := waitForStableLeadership(t, servers)
|
||||
|
||||
// Inject a pending eval
|
||||
req := structs.EvalUpdateRequest{
|
||||
@@ -326,27 +289,8 @@ func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) {
|
||||
defer s3.Shutdown()
|
||||
servers := []*Server{s1, s2, s3}
|
||||
TestJoin(t, s1, s2, s3)
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
for _, s := range servers {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
peers, _ := s.numPeers()
|
||||
return peers == 3, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("should have 3 peers")
|
||||
})
|
||||
}
|
||||
|
||||
var leader *Server
|
||||
for _, s := range servers {
|
||||
if s.IsLeader() {
|
||||
leader = s
|
||||
break
|
||||
}
|
||||
}
|
||||
if leader == nil {
|
||||
t.Fatalf("Should have a leader")
|
||||
}
|
||||
leader := waitForStableLeadership(t, servers)
|
||||
|
||||
// Inject a periodic job, a parameterized periodic job and a non-periodic job
|
||||
periodic := mock.PeriodicJob()
|
||||
@@ -1229,3 +1173,58 @@ func TestServer_ReconcileMember(t *testing.T) {
|
||||
t.Fatalf("got %d server ids want %d", got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// waitForStableLeadership waits until a leader is elected and all servers
|
||||
// get promoted as voting members, returns the leader
|
||||
func waitForStableLeadership(t *testing.T, servers []*Server) *Server {
|
||||
nPeers := len(servers)
|
||||
|
||||
// wait for all servers to discover each other
|
||||
for _, s := range servers {
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
peers, _ := s.numPeers()
|
||||
return peers == 3, fmt.Errorf("should find %d peers but found %d", nPeers, peers)
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
}
|
||||
|
||||
// wait for leader
|
||||
var leader *Server
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
for _, s := range servers {
|
||||
if s.IsLeader() {
|
||||
leader = s
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, fmt.Errorf("no leader found")
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
// wait for all servers get marked as voters
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
future := leader.raft.GetConfiguration()
|
||||
if err := future.Error(); err != nil {
|
||||
return false, fmt.Errorf("failed to get raft config: %v", future.Error())
|
||||
}
|
||||
ss := future.Configuration().Servers
|
||||
if len(ss) != len(servers) {
|
||||
return false, fmt.Errorf("raft doesn't contain all servers. Expected %d but found %d", len(servers), len(ss))
|
||||
}
|
||||
|
||||
for _, s := range ss {
|
||||
if s.Suffrage != raft.Voter {
|
||||
return false, fmt.Errorf("configuration has non voting server: %v", s)
|
||||
}
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
require.NoError(t, err)
|
||||
})
|
||||
|
||||
return leader
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user