autopilot: deflake tests (#14475)

Includes:

* Remove leader upgrade raft version test, as older versions of raft are now
  incompatible with our autopilot library.

* Remove attempt to assert initial non-voter status on the `PromoteNonVoter`
  test, as this happens too quickly to reliably detect.

* Unskip some previously-skipped tests which we should make stable.

* Remove the `consul/sdk` retry helper for these tests; this uses panic recovery
  in a kind of a clever/gross way to reduce LoC but it seems to introduce some
  timing issues in the process.

* Add more test step logging and reduce logging noise from the scheduler
  goroutines to make it easier to debug failing tests.

* Be more consistent about using the `waitForStableLeadership` helper so that we
  can assert the cluster is fully stable and not just that we've added peers.
This commit is contained in:
Tim Gross
2022-09-07 09:35:01 -04:00
committed by GitHub
parent 05908e3b1a
commit 534869eb67
3 changed files with 117 additions and 367 deletions

View File

@@ -85,7 +85,10 @@ func (d *AutopilotDelegate) RemoveFailedServer(failedSrv *autopilot.Server) {
go func() {
err := d.server.RemoveFailedNode(failedSrv.Name)
if err != nil {
d.server.logger.Error("could not remove failed server", "server", string(failedSrv.ID))
d.server.logger.Error("could not remove failed server",
"server", string(failedSrv.ID),
"error", err,
)
}
}()
}

View File

@@ -5,11 +5,10 @@ import (
"testing"
"time"
// TODO: replace this with our own helper
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/raft"
autopilot "github.com/hashicorp/raft-autopilot"
"github.com/hashicorp/serf/serf"
"github.com/shoenig/test/must"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/testutil"
@@ -38,53 +37,13 @@ func wantPeers(s *Server, peers int) error {
return nil
}
// wantRaft determines if the servers have all of each other in their
// Raft configurations,
func wantRaft(servers []*Server) error {
// Make sure all the servers are represented in the Raft config,
// and that there are no extras.
verifyRaft := func(c raft.Configuration) error {
want := make(map[raft.ServerID]bool)
for _, s := range servers {
want[s.config.RaftConfig.LocalID] = true
}
found := make([]raft.ServerID, 0, len(c.Servers))
for _, s := range c.Servers {
found = append(found, s.ID)
if !want[s.ID] {
return fmt.Errorf("don't want %q", s.ID)
}
delete(want, s.ID)
}
if len(want) > 0 {
return fmt.Errorf("didn't find %v in %#+v", want, found)
}
return nil
}
for _, s := range servers {
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return err
}
if err := verifyRaft(future.Configuration()); err != nil {
return err
}
}
return nil
}
func TestAutopilot_CleanupDeadServer(t *testing.T) {
ci.Parallel(t)
t.Run("raft_v3", func(t *testing.T) { testCleanupDeadServer(t, 3) })
}
func testCleanupDeadServer(t *testing.T, raftVersion int) {
conf := func(c *Config) {
c.NumSchedulers = 0 // reduces test log noise
c.BootstrapExpect = 3
c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(raftVersion)
c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(3)
}
s1, cleanupS1 := TestServer(t, conf)
@@ -97,16 +56,11 @@ func testCleanupDeadServer(t *testing.T, raftVersion int) {
defer cleanupS3()
servers := []*Server{s1, s2, s3}
// Try to join
TestJoin(t, servers...)
for _, s := range servers {
testutil.WaitForLeader(t, s.RPC)
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
t.Logf("waiting for initial stable cluster")
waitForStableLeadership(t, servers)
// Bring up a new server
s4, cleanupS4 := TestServer(t, conf)
defer cleanupS4()
@@ -115,12 +69,14 @@ func testCleanupDeadServer(t *testing.T, raftVersion int) {
for i, s := range servers {
if !s.IsLeader() {
killedIdx = i
t.Logf("killing a server (index %d)", killedIdx)
s.Shutdown()
break
}
}
retry.Run(t, func(r *retry.R) {
t.Logf("waiting for server loss to be detected")
testutil.WaitForResultUntil(10*time.Second, func() (bool, error) {
for i, s := range servers {
alive := 0
if i == killedIdx {
@@ -134,27 +90,26 @@ func testCleanupDeadServer(t *testing.T, raftVersion int) {
}
if alive != 2 {
r.Fatalf("expected 2 alive servers but found %v", alive)
return false, fmt.Errorf("expected 2 alive servers but found %v", alive)
}
}
})
return true, nil
}, func(err error) { must.NoError(t, err) })
// Join the new server
servers[killedIdx] = s4
t.Logf("adding server s4")
TestJoin(t, servers...)
t.Logf("waiting for dead server to be removed")
waitForStableLeadership(t, servers)
// Make sure the dead server is removed and we're back to 3 total peers
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
}
func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) {
ci.Parallel(t)
conf := func(c *Config) {
c.NumSchedulers = 0 // reduces test log noise
c.BootstrapExpect = 5
}
@@ -174,37 +129,27 @@ func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) {
defer cleanupS5()
servers := []*Server{s1, s2, s3, s4, s5}
// Join the servers to s1, and wait until they are all promoted to
// voters.
TestJoin(t, servers...)
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 5))
}
})
// Kill a non-leader server
t.Logf("waiting for initial stable cluster")
waitForStableLeadership(t, servers)
t.Logf("killing a non-leader server")
if leader := waitForStableLeadership(t, servers); leader == s4 {
s1, s4 = s4, s1
}
s4.Shutdown()
// Should be removed from the peers automatically
t.Logf("waiting for dead peer to be removed")
servers = []*Server{s1, s2, s3, s5}
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 4))
}
})
waitForStableLeadership(t, servers)
}
func TestAutopilot_RollingUpdate(t *testing.T) {
ci.Parallel(t)
conf := func(c *Config) {
c.NumSchedulers = 0 // reduces test log noise
c.BootstrapExpect = 3
c.RaftConfig.ProtocolVersion = 3
}
@@ -218,16 +163,11 @@ func TestAutopilot_RollingUpdate(t *testing.T) {
s3, cleanupS3 := TestServer(t, conf)
defer cleanupS3()
// Join the servers to s1, and wait until they are all promoted to
// voters.
servers := []*Server{s1, s2, s3}
TestJoin(t, s1, s2, s3)
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
})
t.Logf("waiting for initial stable cluster")
waitForStableLeadership(t, servers)
// Add one more server like we are doing a rolling update.
t.Logf("adding server s4")
@@ -235,53 +175,26 @@ func TestAutopilot_RollingUpdate(t *testing.T) {
defer cleanupS4()
TestJoin(t, s1, s4)
// Wait for s4 to stabilize and get promoted to a voter
t.Logf("waiting for s4 to stabilize and be promoted")
servers = append(servers, s4)
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 4))
}
})
waitForStableLeadership(t, servers)
// Now kill one of the "old" nodes like we are doing a rolling update.
t.Logf("shutting down server s3")
s3.Shutdown()
isVoter := func() bool {
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
t.Fatalf("err: %v", err)
}
for _, s := range future.Configuration().Servers {
if string(s.ID) == string(s4.config.NodeID) {
return s.Suffrage == raft.Voter
}
}
t.Fatalf("didn't find s4")
return false
}
t.Logf("waiting for s4 to stabalize and be promoted")
// Wait for s4 to stabilize, get promoted to a voter, and for s3 to be
// removed.
// Wait for s3 to be removed and the cluster to stablize.
t.Logf("waiting for cluster to stabilize")
servers = []*Server{s1, s2, s4}
retry.Run(t, func(r *retry.R) {
r.Check(wantRaft(servers))
for _, s := range servers {
r.Check(wantPeers(s, 3))
}
if !isVoter() {
r.Fatalf("should be a voter")
}
})
waitForStableLeadership(t, servers)
}
func TestAutopilot_CleanupStaleRaftServer(t *testing.T) {
t.Skip("TestAutopilot_CleanupDeadServer is very flaky, removing it for now")
ci.Parallel(t)
conf := func(c *Config) {
c.NumSchedulers = 0 // reduces test log noise
c.BootstrapExpect = 3
}
s1, cleanupS1 := TestServer(t, conf)
@@ -299,38 +212,27 @@ func TestAutopilot_CleanupStaleRaftServer(t *testing.T) {
defer cleanupS4()
servers := []*Server{s1, s2, s3}
// Join the servers to s1
TestJoin(t, s1, s2, s3)
t.Logf("waiting for initial stable cluster")
leader := waitForStableLeadership(t, servers)
// Add s4 to peers directly
t.Logf("adding server s4 to peers directly")
addr := fmt.Sprintf("127.0.0.1:%d", s4.config.RPCAddr.Port)
future := leader.raft.AddVoter(raft.ServerID(s4.config.NodeID), raft.ServerAddress(addr), 0, 0)
if err := future.Error(); err != nil {
t.Fatal(err)
}
// Verify we have 4 peers
peers, err := s1.numPeers()
if err != nil {
t.Fatal(err)
}
if peers != 4 {
t.Fatalf("bad: %v", peers)
}
// Wait for s4 to be removed
for _, s := range []*Server{s1, s2, s3} {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
t.Logf("waiting for 4th server to be removed")
waitForStableLeadership(t, servers)
}
func TestAutopilot_PromoteNonVoter(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // reduces test log noise
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS1()
@@ -339,52 +241,31 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) {
testutil.WaitForLeader(t, s1.RPC)
s2, cleanupS2 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // reduces test log noise
c.BootstrapExpect = 0
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS2()
TestJoin(t, s1, s2)
// Make sure we see it as a nonvoter initially. We wait until half
// the stabilization period has passed.
retry.Run(t, func(r *retry.R) {
// Note: we can't reliably detect that the server is initially a non-voter,
// because it can transition too quickly for the test setup to detect,
// especially in low-resource environments like CI. We'll assume that
// happens correctly here and only test that it transitions to become a
// voter.
testutil.WaitForResultUntil(10*time.Second, func() (bool, error) {
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
r.Fatal(err)
return false, err
}
servers := future.Configuration().Servers
if len(servers) != 2 {
r.Fatalf("bad: %v", servers)
}
if servers[1].Suffrage != raft.Nonvoter {
r.Fatalf("bad: %v", servers)
}
health := s1.autopilot.GetServerHealth(raft.ServerID(servers[1].ID))
if health == nil {
r.Fatalf("nil health, %v", s1.GetClusterHealth())
}
if !health.Healthy {
r.Fatalf("bad: %v", health)
}
if time.Since(health.StableSince) < s1.config.AutopilotConfig.ServerStabilizationTime/2 {
r.Fatal("stable period not elapsed")
}
})
// Make sure it ends up as a voter.
retry.Run(t, func(r *retry.R) {
future := s1.raft.GetConfiguration()
if err := future.Error(); err != nil {
r.Fatal(err)
}
servers := future.Configuration().Servers
if len(servers) != 2 {
r.Fatalf("bad: %v", servers)
return false, fmt.Errorf("expected 2 servers, got: %v", servers)
}
if servers[1].Suffrage != raft.Voter {
r.Fatalf("bad: %v", servers)
return false, fmt.Errorf("expected server to be voter: %v", servers)
}
})
return true, nil
}, func(err error) { must.NoError(t, err) })
}

View File

@@ -12,6 +12,10 @@ import (
"github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
"github.com/hashicorp/go-version"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/mock"
@@ -20,9 +24,6 @@ import (
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
"github.com/hashicorp/serf/serf"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestLeader_LeftServer(t *testing.T) {
@@ -1134,96 +1135,6 @@ func Test_diffACLRoles(t *testing.T) {
require.ElementsMatch(t, []string{aclRole3.ID, aclRole4.ID}, toUpdate)
}
func TestLeader_UpgradeRaftVersion(t *testing.T) {
ci.Parallel(t)
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 2
})
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) {
c.BootstrapExpect = 3
c.RaftConfig.ProtocolVersion = 1
})
defer cleanupS2()
s3, cleanupS3 := TestServer(t, func(c *Config) {
c.BootstrapExpect = 3
c.RaftConfig.ProtocolVersion = 2
})
defer cleanupS3()
servers := []*Server{s1, s2, s3}
// Try to join
TestJoin(t, s1, s2, s3)
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
peers, _ := s.numPeers()
return peers == 3, nil
}, func(err error) {
t.Fatalf("should have 3 peers")
})
}
// Kill the v1 server
if err := s2.Leave(); err != nil {
t.Fatal(err)
}
for _, s := range []*Server{s1, s3} {
minVer, err := s.MinRaftProtocol()
if err != nil {
t.Fatal(err)
}
if got, want := minVer, 2; got != want {
t.Fatalf("got min raft version %d want %d", got, want)
}
}
// Replace the dead server with one running raft protocol v3
s4, cleanupS4 := TestServer(t, func(c *Config) {
c.BootstrapExpect = 3
c.Datacenter = "dc1"
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS4()
TestJoin(t, s1, s4)
servers[1] = s4
// Make sure we're back to 3 total peers with the new one added via ID
for _, s := range servers {
testutil.WaitForResult(func() (bool, error) {
addrs := 0
ids := 0
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
return false, err
}
for _, server := range future.Configuration().Servers {
if string(server.ID) == string(server.Address) {
addrs++
} else {
ids++
}
}
if got, want := addrs, 2; got != want {
return false, fmt.Errorf("got %d server addresses want %d", got, want)
}
if got, want := ids, 1; got != want {
return false, fmt.Errorf("got %d server ids want %d", got, want)
}
return true, nil
}, func(err error) {
t.Fatal(err)
})
}
}
func TestLeader_Reelection(t *testing.T) {
ci.Parallel(t)
@@ -1287,7 +1198,6 @@ func TestLeader_Reelection(t *testing.T) {
func TestLeader_RollRaftServer(t *testing.T) {
ci.Parallel(t)
ci.SkipSlow(t, "flaky on GHA; #12358")
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.BootstrapExpect = 3
@@ -1308,148 +1218,104 @@ func TestLeader_RollRaftServer(t *testing.T) {
defer cleanupS3()
servers := []*Server{s1, s2, s3}
// Try to join
TestJoin(t, s1, s2, s3)
for _, s := range servers {
retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) })
}
t.Logf("waiting for initial stable cluster")
waitForStableLeadership(t, servers)
// Kill the first v2 server
t.Logf("killing server s1")
s1.Shutdown()
for _, s := range []*Server{s2, s3} {
s.RemoveFailedNode(s1.config.NodeID)
retry.Run(t, func(r *retry.R) {
configFuture := s.raft.GetConfiguration()
if len(configFuture.Configuration().Servers) != 2 {
r.Fatalf("expected 2 servers, got %d", len(configFuture.Configuration().Servers))
}
})
}
// Replace the dead server
t.Logf("waiting for server loss to be detected")
testutil.WaitForResultUntil(time.Second*10,
func() (bool, error) {
for _, s := range []*Server{s2, s3} {
err := wantPeers(s, 2)
if err != nil {
return false, err
}
}
return true, nil
},
func(err error) { must.NoError(t, err) },
)
t.Logf("adding replacement server s4")
s4, cleanupS4 := TestServer(t, func(c *Config) {
c.BootstrapExpect = 3
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS4()
TestJoin(t, s2, s3, s4)
servers[0] = s4
servers = []*Server{s4, s2, s3}
for _, s := range []*Server{s3, s4} {
retry.RunWith(&retry.Counter{
Count: int(10 * testutil.TestMultiplier()),
Wait: time.Duration(testutil.TestMultiplier()) * time.Second * 2,
}, t, func(r *retry.R) {
configFuture := s.raft.GetConfiguration()
if len(configFuture.Configuration().Servers) != 3 {
r.Fatalf("expected 3 servers, got %d", len(configFuture.Configuration().Servers))
}
})
}
t.Logf("waiting for s4 to stabilize")
waitForStableLeadership(t, servers)
// Kill the second server
t.Logf("killing server s2")
s2.Shutdown()
for _, s := range []*Server{s3, s4} {
s.RemoveFailedNode(s2.config.NodeID)
retry.RunWith(&retry.Counter{
Count: int(10 * testutil.TestMultiplier()),
Wait: time.Duration(testutil.TestMultiplier()) * time.Second * 2,
}, t, func(r *retry.R) {
configFuture := s.raft.GetConfiguration()
if len(configFuture.Configuration().Servers) != 3 {
r.Fatalf("expected 2 servers, got %d", len(configFuture.Configuration().Servers))
}
})
}
// Replace the 2nd dead server
t.Logf("waiting for server loss to be detected")
testutil.WaitForResultUntil(time.Second*10,
func() (bool, error) {
for _, s := range []*Server{s3, s4} {
err := wantPeers(s, 2)
if err != nil {
return false, err
}
}
return true, nil
},
func(err error) { must.NoError(t, err) },
)
t.Logf("adding replacement server s5")
s5, cleanupS5 := TestServer(t, func(c *Config) {
c.BootstrapExpect = 3
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS5()
TestJoin(t, s3, s4, s5)
servers[1] = s5
servers = []*Server{s4, s5, s3}
for _, s := range []*Server{s3, s4, s5} {
retry.RunWith(&retry.Counter{
Count: int(10 * testutil.TestMultiplier()),
Wait: time.Duration(testutil.TestMultiplier()) * time.Second * 2,
}, t, func(r *retry.R) {
configFuture := s.raft.GetConfiguration()
if len(configFuture.Configuration().Servers) != 3 {
r.Fatalf("expected 3 servers, got %d", len(configFuture.Configuration().Servers))
}
})
}
t.Logf("waiting for s5 to stabilize")
waitForStableLeadership(t, servers)
// Kill the last old server
t.Logf("killing server s3")
s3.Shutdown()
for _, s := range []*Server{s4, s5} {
s.RemoveFailedNode(s3.config.NodeID)
t.Logf("waiting for server loss to be detected")
testutil.WaitForResultUntil(time.Second*10,
func() (bool, error) {
for _, s := range []*Server{s4, s5} {
err := wantPeers(s, 2)
if err != nil {
return false, err
}
}
return true, nil
},
func(err error) { must.NoError(t, err) },
)
retry.RunWith(&retry.Counter{
Count: int(10 * testutil.TestMultiplier()),
Wait: time.Duration(testutil.TestMultiplier()) * time.Second * 2,
}, t, func(r *retry.R) {
minVer, err := s.MinRaftProtocol()
if err != nil {
r.Fatal(err)
}
if got, want := minVer, 3; got != want {
r.Fatalf("got min raft version %d want %d", got, want)
}
configFuture := s.raft.GetConfiguration()
if err != nil {
r.Fatal(err)
}
if len(configFuture.Configuration().Servers) != 3 {
r.Fatalf("expected 2 servers, got %d", len(configFuture.Configuration().Servers))
}
})
}
// Replace the last dead server with one running raft protocol v3
t.Logf("adding replacement server s6")
s6, cleanupS6 := TestServer(t, func(c *Config) {
c.BootstrapExpect = 3
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS6()
TestJoin(t, s6, s4)
servers[2] = s6
servers = []*Server{s4, s5, s6}
// Make sure all the dead servers are removed and we're back to 3 total peers
for _, s := range servers {
retry.Run(t, func(r *retry.R) {
addrs := 0
ids := 0
future := s.raft.GetConfiguration()
if err := future.Error(); err != nil {
r.Fatal(err)
}
for _, server := range future.Configuration().Servers {
if string(server.ID) == string(server.Address) {
addrs++
} else {
ids++
}
}
if got, want := addrs, 0; got != want {
r.Fatalf("got %d server addresses want %d", got, want)
}
if got, want := ids, 3; got != want {
r.Fatalf("got %d server ids want %d", got, want)
}
})
}
t.Logf("waiting for s6 to stabilize")
waitForStableLeadership(t, servers)
}
func TestLeader_RevokeLeadership_MultipleTimes(t *testing.T) {
@@ -1556,7 +1422,7 @@ func TestServer_ReconcileMember(t *testing.T) {
// after leadership has been established to reduce
s3, cleanupS3 := TestServer(t, func(c *Config) {
c.BootstrapExpect = 0
c.RaftConfig.ProtocolVersion = 2
c.RaftConfig.ProtocolVersion = 3
})
defer cleanupS3()