From 4cc90c0b22e600305f1d257ea1bd6350fffcef8a Mon Sep 17 00:00:00 2001 From: Charlie Voiselle <464492+angrycub@users.noreply.github.com> Date: Mon, 18 Mar 2024 18:03:23 -0400 Subject: [PATCH] Fix LeadershipTransfer tests (#20154) Multi-node Nomad clusters under test must use the RPC calls to bootstrap the ACL subsystem. The original implementation of the testcluster tried naively supersizing the single node behavior in TestRaftRemovePeer, which mutates the single node's state directly. In the case of a multi- node cluster, the RPC calls are necessary to ensure that the data is replicated via Raft to all of the cluster members. --- nomad/operator_endpoint_test.go | 204 +++++++++++++++++++++++--------- 1 file changed, 148 insertions(+), 56 deletions(-) diff --git a/nomad/operator_endpoint_test.go b/nomad/operator_endpoint_test.go index 822f7757e..7f1234194 100644 --- a/nomad/operator_endpoint_test.go +++ b/nomad/operator_endpoint_test.go @@ -14,6 +14,7 @@ import ( "path" "reflect" "strings" + "sync" "testing" "time" @@ -378,10 +379,10 @@ func TestOperator_RaftRemovePeerByID_ACL(t *testing.T) { type testcluster struct { t *testing.T + args tcArgs server []*Server cleanup []func() token *structs.ACLToken - rpc func(string, any, any) error } func (tc testcluster) Cleanup() { @@ -406,6 +407,7 @@ func newTestCluster(t *testing.T, args tcArgs) (tc testcluster) { cSize := args.size out := testcluster{ t: t, + args: args, server: make([]*Server, cSize), cleanup: make([]func(), cSize), } @@ -419,26 +421,46 @@ func newTestCluster(t *testing.T, args tcArgs) (tc testcluster) { }) } t.Cleanup(out.Cleanup) - out.rpc = out.server[0].RPC TestJoin(t, out.server...) out.WaitForLeader() if args.enableACL { + s1 := out.server[0] + bsToken := new(structs.ACLToken) // Bootstrap the ACL subsystem - token := mock.ACLManagementToken() - err := out.server[0].State().BootstrapACLTokens(structs.MsgTypeTestSetup, 1, 0, token) + req := &structs.ACLTokenBootstrapRequest{ + Token: bsToken, + WriteRequest: structs.WriteRequest{Region: s1.config.Region}, + } + resp := &structs.ACLTokenUpsertResponse{} + err := out.server[0].RPC("ACL.Bootstrap", req, resp) if err != nil { t.Fatalf("failed to bootstrap ACL token: %v", err) } - t.Logf("bootstrap token: %v", *token) - out.token = token + t.Logf("bootstrap token: %v", *resp.Tokens[0]) + out.token = resp.Tokens[0] } return out } +// WaitForLeader performs a parallel WaitForLeader over each cluster member, +// because testutil doesn't export rpcFn so we can't create a collection of +// rpcFn to use testutil.WaitForLeaders directly. func (tc testcluster) WaitForLeader() { - testutil.WaitForLeader(tc.t, tc.rpc) + var wg sync.WaitGroup + for i := 0; i < len(tc.server); i++ { + idx := i + wg.Add(1) + go func() { + defer wg.Done() + + // The WaitForLeader func uses WaitForResultRetries + // so this should timeout at 5 seconds * test multiplier + testutil.WaitForLeader(tc.t, tc.server[idx].RPC) + }() + } + wg.Wait() } func (tc testcluster) leader() *Server { @@ -450,33 +472,35 @@ func (tc testcluster) leader() *Server { } return nil } +func (tc testcluster) anyFollowerRaftServerID() raft.ServerID { + tc.WaitForLeader() + s1 := tc.server[0] + _, ldrID := s1.raft.LeaderWithID() -func (tc testcluster) anyFollower() *Server { - if len(tc.server) < 2 { - return nil - } + var tgtID raft.ServerID - testutil.WaitForLeader(tc.t, tc.rpc) - for _, s := range tc.server { - if isLeader, _ := s.getLeader(); !isLeader { - return s + s1.peerLock.Lock() + defer s1.peerLock.Unlock() + + // Find the first non-leader server in the list. + for _, sp := range s1.localPeers { + tgtID = raft.ServerID(sp.ID) + if tgtID != ldrID { + break } } - // something weird happened. - return nil + return tgtID } - -func TestOperator_TransferLeadershipToServerAddress_ACL(t *testing.T) { - ci.Parallel(t) - - tc := newTestCluster(t, tcArgs{enableACL: true}) - s1 := tc.leader() - codec := rpcClient(t, s1) - state := s1.fsm.State() - +func (tc testcluster) anyFollowerRaftServerAddress() raft.ServerAddress { + tc.WaitForLeader() + s1 := tc.server[0] lAddr, _ := s1.raft.LeaderWithID() var addr raft.ServerAddress + + s1.peerLock.Lock() + defer s1.peerLock.Unlock() + // Find the first non-leader server in the list. for a := range s1.localPeers { addr = a @@ -484,36 +508,83 @@ func TestOperator_TransferLeadershipToServerAddress_ACL(t *testing.T) { break } } + return addr +} - // Create ACL token - invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite)) +func TestOperator_TransferLeadershipToServerAddress_ACL(t *testing.T) { + ci.Parallel(t) + var err error - arg := &structs.RaftPeerRequest{ + tc := newTestCluster(t, tcArgs{enableACL: true}) + s1 := tc.leader() + must.NotNil(t, s1) + codec := rpcClient(t, s1) + + addr := tc.anyFollowerRaftServerAddress() + + mgmtWR := structs.WriteRequest{ + Region: s1.config.Region, + AuthToken: tc.token.SecretID, + } + + // Create invalid ACL Token + pReq := &structs.ACLPolicyUpsertRequest{ + Policies: []*structs.ACLPolicy{ + { + Name: "node-write-only", + Rules: `node { policy = "write" }`, + }, + }, + WriteRequest: mgmtWR, + } + pResp := &structs.GenericResponse{} + err = msgpackrpc.CallWithCodec(codec, structs.ACLUpsertPoliciesRPCMethod, pReq, pResp) + must.NoError(t, err) + + tReq := &structs.ACLTokenUpsertRequest{ + Tokens: []*structs.ACLToken{ + { + Name: "invalid", + Policies: []string{"node_write_only"}, + Type: structs.ACLClientToken, + }, + }, + WriteRequest: mgmtWR, + } + tResp := &structs.ACLTokenUpsertResponse{} + err = msgpackrpc.CallWithCodec(codec, structs.ACLUpsertTokensRPCMethod, tReq, tResp) + must.NoError(t, err) + + invalidToken := tResp.Tokens[0] + + testReq := &structs.RaftPeerRequest{ RaftIDAddress: structs.RaftIDAddress{Address: addr}, - WriteRequest: structs.WriteRequest{Region: s1.config.Region}, + WriteRequest: structs.WriteRequest{ + Region: s1.config.Region, + }, } var reply struct{} t.Run("no-token", func(t *testing.T) { // Try with no token and expect permission denied - err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply) + err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply) must.Error(t, err) must.ErrorIs(t, err, rpcPermDeniedErr) }) t.Run("invalid-token", func(t *testing.T) { // Try with an invalid token and expect permission denied - arg.AuthToken = invalidToken.SecretID - err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply) + testReq.AuthToken = invalidToken.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply) must.Error(t, err) must.ErrorIs(t, err, rpcPermDeniedErr) }) t.Run("good-token", func(t *testing.T) { // Try with a management token - arg.AuthToken = tc.token.SecretID - err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply) + testReq.AuthToken = tc.token.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply) must.NoError(t, err) // Is the expected leader the new one? @@ -525,55 +596,76 @@ func TestOperator_TransferLeadershipToServerAddress_ACL(t *testing.T) { func TestOperator_TransferLeadershipToServerID_ACL(t *testing.T) { ci.Parallel(t) + var err error + tc := newTestCluster(t, tcArgs{enableACL: true}) s1 := tc.leader() + must.NotNil(t, s1) codec := rpcClient(t, s1) - state := s1.fsm.State() - _, ldrID := s1.raft.LeaderWithID() - - var tgtID raft.ServerID - // Find the first non-leader server in the list. - s1.peerLock.Lock() - for _, sp := range s1.localPeers { - tgtID = raft.ServerID(sp.ID) - if tgtID != ldrID { - break - } + mgmtWR := structs.WriteRequest{ + Region: s1.config.Region, + AuthToken: tc.token.SecretID, } - s1.peerLock.Unlock() - // Create ACL token - invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite)) + // Create invalid ACL Token + pReq := &structs.ACLPolicyUpsertRequest{ + Policies: []*structs.ACLPolicy{ + { + Name: "node-write-only", + Rules: `node { policy = "write" }`, + }, + }, + WriteRequest: mgmtWR, + } + pResp := &structs.GenericResponse{} + err = msgpackrpc.CallWithCodec(codec, structs.ACLUpsertPoliciesRPCMethod, pReq, pResp) + must.NoError(t, err) - arg := &structs.RaftPeerRequest{ + tReq := &structs.ACLTokenUpsertRequest{ + Tokens: []*structs.ACLToken{ + { + Name: "invalid", + Policies: []string{"node_write_only"}, + Type: structs.ACLClientToken, + }, + }, + WriteRequest: mgmtWR, + } + tResp := &structs.ACLTokenUpsertResponse{} + err = msgpackrpc.CallWithCodec(codec, structs.ACLUpsertTokensRPCMethod, tReq, tResp) + must.NoError(t, err) + + invalidToken := tResp.Tokens[0] + + tgtID := tc.anyFollowerRaftServerID() + testReq := &structs.RaftPeerRequest{ RaftIDAddress: structs.RaftIDAddress{ ID: tgtID, }, WriteRequest: structs.WriteRequest{Region: s1.config.Region}, } - var reply struct{} t.Run("no-token", func(t *testing.T) { // Try with no token and expect permission denied - err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply) + err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply) must.Error(t, err) must.ErrorIs(t, err, rpcPermDeniedErr) }) t.Run("invalid-token", func(t *testing.T) { // Try with an invalid token and expect permission denied - arg.AuthToken = invalidToken.SecretID - err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply) + testReq.AuthToken = invalidToken.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply) must.Error(t, err) must.ErrorIs(t, err, rpcPermDeniedErr) }) t.Run("good-token", func(t *testing.T) { // Try with a management token - arg.AuthToken = tc.token.SecretID - err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply) + testReq.AuthToken = tc.token.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply) must.NoError(t, err) // Is the expected leader the new one?