diff --git a/nomad/autopilot.go b/nomad/autopilot.go index 3d56ef4df..c5a382ece 100644 --- a/nomad/autopilot.go +++ b/nomad/autopilot.go @@ -85,7 +85,10 @@ func (d *AutopilotDelegate) RemoveFailedServer(failedSrv *autopilot.Server) { go func() { err := d.server.RemoveFailedNode(failedSrv.Name) if err != nil { - d.server.logger.Error("could not remove failed server", "server", string(failedSrv.ID)) + d.server.logger.Error("could not remove failed server", + "server", string(failedSrv.ID), + "error", err, + ) } }() } diff --git a/nomad/autopilot_test.go b/nomad/autopilot_test.go index 417474126..fb2afbe08 100644 --- a/nomad/autopilot_test.go +++ b/nomad/autopilot_test.go @@ -5,11 +5,10 @@ import ( "testing" "time" - // TODO: replace this with our own helper - "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/raft" autopilot "github.com/hashicorp/raft-autopilot" "github.com/hashicorp/serf/serf" + "github.com/shoenig/test/must" "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/testutil" @@ -38,53 +37,13 @@ func wantPeers(s *Server, peers int) error { return nil } -// wantRaft determines if the servers have all of each other in their -// Raft configurations, -func wantRaft(servers []*Server) error { - // Make sure all the servers are represented in the Raft config, - // and that there are no extras. - verifyRaft := func(c raft.Configuration) error { - want := make(map[raft.ServerID]bool) - for _, s := range servers { - want[s.config.RaftConfig.LocalID] = true - } - - found := make([]raft.ServerID, 0, len(c.Servers)) - for _, s := range c.Servers { - found = append(found, s.ID) - if !want[s.ID] { - return fmt.Errorf("don't want %q", s.ID) - } - delete(want, s.ID) - } - - if len(want) > 0 { - return fmt.Errorf("didn't find %v in %#+v", want, found) - } - return nil - } - - for _, s := range servers { - future := s.raft.GetConfiguration() - if err := future.Error(); err != nil { - return err - } - if err := verifyRaft(future.Configuration()); err != nil { - return err - } - } - return nil -} - func TestAutopilot_CleanupDeadServer(t *testing.T) { ci.Parallel(t) - t.Run("raft_v3", func(t *testing.T) { testCleanupDeadServer(t, 3) }) -} -func testCleanupDeadServer(t *testing.T, raftVersion int) { conf := func(c *Config) { + c.NumSchedulers = 0 // reduces test log noise c.BootstrapExpect = 3 - c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(raftVersion) + c.RaftConfig.ProtocolVersion = raft.ProtocolVersion(3) } s1, cleanupS1 := TestServer(t, conf) @@ -97,16 +56,11 @@ func testCleanupDeadServer(t *testing.T, raftVersion int) { defer cleanupS3() servers := []*Server{s1, s2, s3} - - // Try to join TestJoin(t, servers...) - for _, s := range servers { - testutil.WaitForLeader(t, s.RPC) - retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) }) - } + t.Logf("waiting for initial stable cluster") + waitForStableLeadership(t, servers) - // Bring up a new server s4, cleanupS4 := TestServer(t, conf) defer cleanupS4() @@ -115,12 +69,14 @@ func testCleanupDeadServer(t *testing.T, raftVersion int) { for i, s := range servers { if !s.IsLeader() { killedIdx = i + t.Logf("killing a server (index %d)", killedIdx) s.Shutdown() break } } - retry.Run(t, func(r *retry.R) { + t.Logf("waiting for server loss to be detected") + testutil.WaitForResultUntil(10*time.Second, func() (bool, error) { for i, s := range servers { alive := 0 if i == killedIdx { @@ -134,27 +90,26 @@ func testCleanupDeadServer(t *testing.T, raftVersion int) { } if alive != 2 { - r.Fatalf("expected 2 alive servers but found %v", alive) + return false, fmt.Errorf("expected 2 alive servers but found %v", alive) } } - }) + return true, nil + }, func(err error) { must.NoError(t, err) }) // Join the new server servers[killedIdx] = s4 + t.Logf("adding server s4") TestJoin(t, servers...) + t.Logf("waiting for dead server to be removed") waitForStableLeadership(t, servers) - - // Make sure the dead server is removed and we're back to 3 total peers - for _, s := range servers { - retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) }) - } } func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) { ci.Parallel(t) conf := func(c *Config) { + c.NumSchedulers = 0 // reduces test log noise c.BootstrapExpect = 5 } @@ -174,37 +129,27 @@ func TestAutopilot_CleanupDeadServerPeriodic(t *testing.T) { defer cleanupS5() servers := []*Server{s1, s2, s3, s4, s5} - - // Join the servers to s1, and wait until they are all promoted to - // voters. TestJoin(t, servers...) - retry.Run(t, func(r *retry.R) { - r.Check(wantRaft(servers)) - for _, s := range servers { - r.Check(wantPeers(s, 5)) - } - }) - // Kill a non-leader server + t.Logf("waiting for initial stable cluster") + waitForStableLeadership(t, servers) + + t.Logf("killing a non-leader server") if leader := waitForStableLeadership(t, servers); leader == s4 { s1, s4 = s4, s1 } s4.Shutdown() - // Should be removed from the peers automatically + t.Logf("waiting for dead peer to be removed") servers = []*Server{s1, s2, s3, s5} - retry.Run(t, func(r *retry.R) { - r.Check(wantRaft(servers)) - for _, s := range servers { - r.Check(wantPeers(s, 4)) - } - }) + waitForStableLeadership(t, servers) } func TestAutopilot_RollingUpdate(t *testing.T) { ci.Parallel(t) conf := func(c *Config) { + c.NumSchedulers = 0 // reduces test log noise c.BootstrapExpect = 3 c.RaftConfig.ProtocolVersion = 3 } @@ -218,16 +163,11 @@ func TestAutopilot_RollingUpdate(t *testing.T) { s3, cleanupS3 := TestServer(t, conf) defer cleanupS3() - // Join the servers to s1, and wait until they are all promoted to - // voters. servers := []*Server{s1, s2, s3} TestJoin(t, s1, s2, s3) - retry.Run(t, func(r *retry.R) { - r.Check(wantRaft(servers)) - for _, s := range servers { - r.Check(wantPeers(s, 3)) - } - }) + + t.Logf("waiting for initial stable cluster") + waitForStableLeadership(t, servers) // Add one more server like we are doing a rolling update. t.Logf("adding server s4") @@ -235,53 +175,26 @@ func TestAutopilot_RollingUpdate(t *testing.T) { defer cleanupS4() TestJoin(t, s1, s4) + // Wait for s4 to stabilize and get promoted to a voter + t.Logf("waiting for s4 to stabilize and be promoted") servers = append(servers, s4) - retry.Run(t, func(r *retry.R) { - r.Check(wantRaft(servers)) - for _, s := range servers { - r.Check(wantPeers(s, 4)) - } - }) + waitForStableLeadership(t, servers) // Now kill one of the "old" nodes like we are doing a rolling update. t.Logf("shutting down server s3") s3.Shutdown() - isVoter := func() bool { - future := s1.raft.GetConfiguration() - if err := future.Error(); err != nil { - t.Fatalf("err: %v", err) - } - for _, s := range future.Configuration().Servers { - if string(s.ID) == string(s4.config.NodeID) { - return s.Suffrage == raft.Voter - } - } - t.Fatalf("didn't find s4") - return false - } - - t.Logf("waiting for s4 to stabalize and be promoted") - - // Wait for s4 to stabilize, get promoted to a voter, and for s3 to be - // removed. + // Wait for s3 to be removed and the cluster to stablize. + t.Logf("waiting for cluster to stabilize") servers = []*Server{s1, s2, s4} - retry.Run(t, func(r *retry.R) { - r.Check(wantRaft(servers)) - for _, s := range servers { - r.Check(wantPeers(s, 3)) - } - if !isVoter() { - r.Fatalf("should be a voter") - } - }) + waitForStableLeadership(t, servers) } func TestAutopilot_CleanupStaleRaftServer(t *testing.T) { - t.Skip("TestAutopilot_CleanupDeadServer is very flaky, removing it for now") ci.Parallel(t) conf := func(c *Config) { + c.NumSchedulers = 0 // reduces test log noise c.BootstrapExpect = 3 } s1, cleanupS1 := TestServer(t, conf) @@ -299,38 +212,27 @@ func TestAutopilot_CleanupStaleRaftServer(t *testing.T) { defer cleanupS4() servers := []*Server{s1, s2, s3} - - // Join the servers to s1 TestJoin(t, s1, s2, s3) + t.Logf("waiting for initial stable cluster") leader := waitForStableLeadership(t, servers) - // Add s4 to peers directly + t.Logf("adding server s4 to peers directly") addr := fmt.Sprintf("127.0.0.1:%d", s4.config.RPCAddr.Port) future := leader.raft.AddVoter(raft.ServerID(s4.config.NodeID), raft.ServerAddress(addr), 0, 0) if err := future.Error(); err != nil { t.Fatal(err) } - // Verify we have 4 peers - peers, err := s1.numPeers() - if err != nil { - t.Fatal(err) - } - if peers != 4 { - t.Fatalf("bad: %v", peers) - } - - // Wait for s4 to be removed - for _, s := range []*Server{s1, s2, s3} { - retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) }) - } + t.Logf("waiting for 4th server to be removed") + waitForStableLeadership(t, servers) } func TestAutopilot_PromoteNonVoter(t *testing.T) { ci.Parallel(t) s1, cleanupS1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // reduces test log noise c.RaftConfig.ProtocolVersion = 3 }) defer cleanupS1() @@ -339,52 +241,31 @@ func TestAutopilot_PromoteNonVoter(t *testing.T) { testutil.WaitForLeader(t, s1.RPC) s2, cleanupS2 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // reduces test log noise c.BootstrapExpect = 0 c.RaftConfig.ProtocolVersion = 3 }) defer cleanupS2() TestJoin(t, s1, s2) - // Make sure we see it as a nonvoter initially. We wait until half - // the stabilization period has passed. - retry.Run(t, func(r *retry.R) { + // Note: we can't reliably detect that the server is initially a non-voter, + // because it can transition too quickly for the test setup to detect, + // especially in low-resource environments like CI. We'll assume that + // happens correctly here and only test that it transitions to become a + // voter. + testutil.WaitForResultUntil(10*time.Second, func() (bool, error) { future := s1.raft.GetConfiguration() if err := future.Error(); err != nil { - r.Fatal(err) + return false, err } - servers := future.Configuration().Servers if len(servers) != 2 { - r.Fatalf("bad: %v", servers) - } - if servers[1].Suffrage != raft.Nonvoter { - r.Fatalf("bad: %v", servers) - } - health := s1.autopilot.GetServerHealth(raft.ServerID(servers[1].ID)) - if health == nil { - r.Fatalf("nil health, %v", s1.GetClusterHealth()) - } - if !health.Healthy { - r.Fatalf("bad: %v", health) - } - if time.Since(health.StableSince) < s1.config.AutopilotConfig.ServerStabilizationTime/2 { - r.Fatal("stable period not elapsed") - } - }) - - // Make sure it ends up as a voter. - retry.Run(t, func(r *retry.R) { - future := s1.raft.GetConfiguration() - if err := future.Error(); err != nil { - r.Fatal(err) - } - - servers := future.Configuration().Servers - if len(servers) != 2 { - r.Fatalf("bad: %v", servers) + return false, fmt.Errorf("expected 2 servers, got: %v", servers) } if servers[1].Suffrage != raft.Voter { - r.Fatalf("bad: %v", servers) + return false, fmt.Errorf("expected server to be voter: %v", servers) } - }) + return true, nil + }, func(err error) { must.NoError(t, err) }) + } diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 2eab55473..534d42ddf 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -12,6 +12,10 @@ import ( "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-version" + "github.com/shoenig/test/must" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/nomad/ci" "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/mock" @@ -20,9 +24,6 @@ import ( "github.com/hashicorp/nomad/testutil" "github.com/hashicorp/raft" "github.com/hashicorp/serf/serf" - "github.com/shoenig/test/must" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestLeader_LeftServer(t *testing.T) { @@ -1134,96 +1135,6 @@ func Test_diffACLRoles(t *testing.T) { require.ElementsMatch(t, []string{aclRole3.ID, aclRole4.ID}, toUpdate) } -func TestLeader_UpgradeRaftVersion(t *testing.T) { - ci.Parallel(t) - - s1, cleanupS1 := TestServer(t, func(c *Config) { - c.Datacenter = "dc1" - c.RaftConfig.ProtocolVersion = 2 - }) - defer cleanupS1() - - s2, cleanupS2 := TestServer(t, func(c *Config) { - c.BootstrapExpect = 3 - c.RaftConfig.ProtocolVersion = 1 - }) - defer cleanupS2() - - s3, cleanupS3 := TestServer(t, func(c *Config) { - c.BootstrapExpect = 3 - c.RaftConfig.ProtocolVersion = 2 - }) - defer cleanupS3() - - servers := []*Server{s1, s2, s3} - - // Try to join - TestJoin(t, s1, s2, s3) - - for _, s := range servers { - testutil.WaitForResult(func() (bool, error) { - peers, _ := s.numPeers() - return peers == 3, nil - }, func(err error) { - t.Fatalf("should have 3 peers") - }) - } - - // Kill the v1 server - if err := s2.Leave(); err != nil { - t.Fatal(err) - } - - for _, s := range []*Server{s1, s3} { - minVer, err := s.MinRaftProtocol() - if err != nil { - t.Fatal(err) - } - if got, want := minVer, 2; got != want { - t.Fatalf("got min raft version %d want %d", got, want) - } - } - - // Replace the dead server with one running raft protocol v3 - s4, cleanupS4 := TestServer(t, func(c *Config) { - c.BootstrapExpect = 3 - c.Datacenter = "dc1" - c.RaftConfig.ProtocolVersion = 3 - }) - defer cleanupS4() - TestJoin(t, s1, s4) - servers[1] = s4 - - // Make sure we're back to 3 total peers with the new one added via ID - for _, s := range servers { - testutil.WaitForResult(func() (bool, error) { - addrs := 0 - ids := 0 - future := s.raft.GetConfiguration() - if err := future.Error(); err != nil { - return false, err - } - for _, server := range future.Configuration().Servers { - if string(server.ID) == string(server.Address) { - addrs++ - } else { - ids++ - } - } - if got, want := addrs, 2; got != want { - return false, fmt.Errorf("got %d server addresses want %d", got, want) - } - if got, want := ids, 1; got != want { - return false, fmt.Errorf("got %d server ids want %d", got, want) - } - - return true, nil - }, func(err error) { - t.Fatal(err) - }) - } -} - func TestLeader_Reelection(t *testing.T) { ci.Parallel(t) @@ -1287,7 +1198,6 @@ func TestLeader_Reelection(t *testing.T) { func TestLeader_RollRaftServer(t *testing.T) { ci.Parallel(t) - ci.SkipSlow(t, "flaky on GHA; #12358") s1, cleanupS1 := TestServer(t, func(c *Config) { c.BootstrapExpect = 3 @@ -1308,148 +1218,104 @@ func TestLeader_RollRaftServer(t *testing.T) { defer cleanupS3() servers := []*Server{s1, s2, s3} - - // Try to join TestJoin(t, s1, s2, s3) - for _, s := range servers { - retry.Run(t, func(r *retry.R) { r.Check(wantPeers(s, 3)) }) - } + t.Logf("waiting for initial stable cluster") + waitForStableLeadership(t, servers) - // Kill the first v2 server + t.Logf("killing server s1") s1.Shutdown() - for _, s := range []*Server{s2, s3} { s.RemoveFailedNode(s1.config.NodeID) - - retry.Run(t, func(r *retry.R) { - configFuture := s.raft.GetConfiguration() - if len(configFuture.Configuration().Servers) != 2 { - r.Fatalf("expected 2 servers, got %d", len(configFuture.Configuration().Servers)) - } - }) } - // Replace the dead server + t.Logf("waiting for server loss to be detected") + testutil.WaitForResultUntil(time.Second*10, + func() (bool, error) { + for _, s := range []*Server{s2, s3} { + err := wantPeers(s, 2) + if err != nil { + return false, err + } + } + return true, nil + + }, + func(err error) { must.NoError(t, err) }, + ) + + t.Logf("adding replacement server s4") s4, cleanupS4 := TestServer(t, func(c *Config) { c.BootstrapExpect = 3 c.RaftConfig.ProtocolVersion = 3 }) defer cleanupS4() TestJoin(t, s2, s3, s4) - servers[0] = s4 + servers = []*Server{s4, s2, s3} - for _, s := range []*Server{s3, s4} { - retry.RunWith(&retry.Counter{ - Count: int(10 * testutil.TestMultiplier()), - Wait: time.Duration(testutil.TestMultiplier()) * time.Second * 2, - }, t, func(r *retry.R) { - configFuture := s.raft.GetConfiguration() - if len(configFuture.Configuration().Servers) != 3 { - r.Fatalf("expected 3 servers, got %d", len(configFuture.Configuration().Servers)) - } - }) - } + t.Logf("waiting for s4 to stabilize") + waitForStableLeadership(t, servers) - // Kill the second server + t.Logf("killing server s2") s2.Shutdown() - for _, s := range []*Server{s3, s4} { s.RemoveFailedNode(s2.config.NodeID) - - retry.RunWith(&retry.Counter{ - Count: int(10 * testutil.TestMultiplier()), - Wait: time.Duration(testutil.TestMultiplier()) * time.Second * 2, - }, t, func(r *retry.R) { - configFuture := s.raft.GetConfiguration() - if len(configFuture.Configuration().Servers) != 3 { - r.Fatalf("expected 2 servers, got %d", len(configFuture.Configuration().Servers)) - } - }) } - // Replace the 2nd dead server + t.Logf("waiting for server loss to be detected") + testutil.WaitForResultUntil(time.Second*10, + func() (bool, error) { + for _, s := range []*Server{s3, s4} { + err := wantPeers(s, 2) + if err != nil { + return false, err + } + } + return true, nil + }, + func(err error) { must.NoError(t, err) }, + ) + + t.Logf("adding replacement server s5") s5, cleanupS5 := TestServer(t, func(c *Config) { c.BootstrapExpect = 3 c.RaftConfig.ProtocolVersion = 3 }) defer cleanupS5() TestJoin(t, s3, s4, s5) - servers[1] = s5 + servers = []*Server{s4, s5, s3} - for _, s := range []*Server{s3, s4, s5} { - retry.RunWith(&retry.Counter{ - Count: int(10 * testutil.TestMultiplier()), - Wait: time.Duration(testutil.TestMultiplier()) * time.Second * 2, - }, t, func(r *retry.R) { - configFuture := s.raft.GetConfiguration() - if len(configFuture.Configuration().Servers) != 3 { - r.Fatalf("expected 3 servers, got %d", len(configFuture.Configuration().Servers)) - } - }) - } + t.Logf("waiting for s5 to stabilize") + waitForStableLeadership(t, servers) - // Kill the last old server + t.Logf("killing server s3") s3.Shutdown() - for _, s := range []*Server{s4, s5} { - s.RemoveFailedNode(s3.config.NodeID) + t.Logf("waiting for server loss to be detected") + testutil.WaitForResultUntil(time.Second*10, + func() (bool, error) { + for _, s := range []*Server{s4, s5} { + err := wantPeers(s, 2) + if err != nil { + return false, err + } + } + return true, nil + }, + func(err error) { must.NoError(t, err) }, + ) - retry.RunWith(&retry.Counter{ - Count: int(10 * testutil.TestMultiplier()), - Wait: time.Duration(testutil.TestMultiplier()) * time.Second * 2, - }, t, func(r *retry.R) { - minVer, err := s.MinRaftProtocol() - if err != nil { - r.Fatal(err) - } - if got, want := minVer, 3; got != want { - r.Fatalf("got min raft version %d want %d", got, want) - } - - configFuture := s.raft.GetConfiguration() - if err != nil { - r.Fatal(err) - } - if len(configFuture.Configuration().Servers) != 3 { - r.Fatalf("expected 2 servers, got %d", len(configFuture.Configuration().Servers)) - } - }) - } - - // Replace the last dead server with one running raft protocol v3 + t.Logf("adding replacement server s6") s6, cleanupS6 := TestServer(t, func(c *Config) { c.BootstrapExpect = 3 c.RaftConfig.ProtocolVersion = 3 }) defer cleanupS6() TestJoin(t, s6, s4) - servers[2] = s6 + servers = []*Server{s4, s5, s6} - // Make sure all the dead servers are removed and we're back to 3 total peers - for _, s := range servers { - retry.Run(t, func(r *retry.R) { - addrs := 0 - ids := 0 - future := s.raft.GetConfiguration() - if err := future.Error(); err != nil { - r.Fatal(err) - } - for _, server := range future.Configuration().Servers { - if string(server.ID) == string(server.Address) { - addrs++ - } else { - ids++ - } - } - if got, want := addrs, 0; got != want { - r.Fatalf("got %d server addresses want %d", got, want) - } - if got, want := ids, 3; got != want { - r.Fatalf("got %d server ids want %d", got, want) - } - }) - } + t.Logf("waiting for s6 to stabilize") + waitForStableLeadership(t, servers) } func TestLeader_RevokeLeadership_MultipleTimes(t *testing.T) { @@ -1556,7 +1422,7 @@ func TestServer_ReconcileMember(t *testing.T) { // after leadership has been established to reduce s3, cleanupS3 := TestServer(t, func(c *Config) { c.BootstrapExpect = 0 - c.RaftConfig.ProtocolVersion = 2 + c.RaftConfig.ProtocolVersion = 3 }) defer cleanupS3()