diff --git a/nomad/operator_endpoint_test.go b/nomad/operator_endpoint_test.go index 822f7757e..7f1234194 100644 --- a/nomad/operator_endpoint_test.go +++ b/nomad/operator_endpoint_test.go @@ -14,6 +14,7 @@ import ( "path" "reflect" "strings" + "sync" "testing" "time" @@ -378,10 +379,10 @@ func TestOperator_RaftRemovePeerByID_ACL(t *testing.T) { type testcluster struct { t *testing.T + args tcArgs server []*Server cleanup []func() token *structs.ACLToken - rpc func(string, any, any) error } func (tc testcluster) Cleanup() { @@ -406,6 +407,7 @@ func newTestCluster(t *testing.T, args tcArgs) (tc testcluster) { cSize := args.size out := testcluster{ t: t, + args: args, server: make([]*Server, cSize), cleanup: make([]func(), cSize), } @@ -419,26 +421,46 @@ func newTestCluster(t *testing.T, args tcArgs) (tc testcluster) { }) } t.Cleanup(out.Cleanup) - out.rpc = out.server[0].RPC TestJoin(t, out.server...) out.WaitForLeader() if args.enableACL { + s1 := out.server[0] + bsToken := new(structs.ACLToken) // Bootstrap the ACL subsystem - token := mock.ACLManagementToken() - err := out.server[0].State().BootstrapACLTokens(structs.MsgTypeTestSetup, 1, 0, token) + req := &structs.ACLTokenBootstrapRequest{ + Token: bsToken, + WriteRequest: structs.WriteRequest{Region: s1.config.Region}, + } + resp := &structs.ACLTokenUpsertResponse{} + err := out.server[0].RPC("ACL.Bootstrap", req, resp) if err != nil { t.Fatalf("failed to bootstrap ACL token: %v", err) } - t.Logf("bootstrap token: %v", *token) - out.token = token + t.Logf("bootstrap token: %v", *resp.Tokens[0]) + out.token = resp.Tokens[0] } return out } +// WaitForLeader performs a parallel WaitForLeader over each cluster member, +// because testutil doesn't export rpcFn so we can't create a collection of +// rpcFn to use testutil.WaitForLeaders directly. func (tc testcluster) WaitForLeader() { - testutil.WaitForLeader(tc.t, tc.rpc) + var wg sync.WaitGroup + for i := 0; i < len(tc.server); i++ { + idx := i + wg.Add(1) + go func() { + defer wg.Done() + + // The WaitForLeader func uses WaitForResultRetries + // so this should timeout at 5 seconds * test multiplier + testutil.WaitForLeader(tc.t, tc.server[idx].RPC) + }() + } + wg.Wait() } func (tc testcluster) leader() *Server { @@ -450,33 +472,35 @@ func (tc testcluster) leader() *Server { } return nil } +func (tc testcluster) anyFollowerRaftServerID() raft.ServerID { + tc.WaitForLeader() + s1 := tc.server[0] + _, ldrID := s1.raft.LeaderWithID() -func (tc testcluster) anyFollower() *Server { - if len(tc.server) < 2 { - return nil - } + var tgtID raft.ServerID - testutil.WaitForLeader(tc.t, tc.rpc) - for _, s := range tc.server { - if isLeader, _ := s.getLeader(); !isLeader { - return s + s1.peerLock.Lock() + defer s1.peerLock.Unlock() + + // Find the first non-leader server in the list. + for _, sp := range s1.localPeers { + tgtID = raft.ServerID(sp.ID) + if tgtID != ldrID { + break } } - // something weird happened. - return nil + return tgtID } - -func TestOperator_TransferLeadershipToServerAddress_ACL(t *testing.T) { - ci.Parallel(t) - - tc := newTestCluster(t, tcArgs{enableACL: true}) - s1 := tc.leader() - codec := rpcClient(t, s1) - state := s1.fsm.State() - +func (tc testcluster) anyFollowerRaftServerAddress() raft.ServerAddress { + tc.WaitForLeader() + s1 := tc.server[0] lAddr, _ := s1.raft.LeaderWithID() var addr raft.ServerAddress + + s1.peerLock.Lock() + defer s1.peerLock.Unlock() + // Find the first non-leader server in the list. for a := range s1.localPeers { addr = a @@ -484,36 +508,83 @@ func TestOperator_TransferLeadershipToServerAddress_ACL(t *testing.T) { break } } + return addr +} - // Create ACL token - invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite)) +func TestOperator_TransferLeadershipToServerAddress_ACL(t *testing.T) { + ci.Parallel(t) + var err error - arg := &structs.RaftPeerRequest{ + tc := newTestCluster(t, tcArgs{enableACL: true}) + s1 := tc.leader() + must.NotNil(t, s1) + codec := rpcClient(t, s1) + + addr := tc.anyFollowerRaftServerAddress() + + mgmtWR := structs.WriteRequest{ + Region: s1.config.Region, + AuthToken: tc.token.SecretID, + } + + // Create invalid ACL Token + pReq := &structs.ACLPolicyUpsertRequest{ + Policies: []*structs.ACLPolicy{ + { + Name: "node-write-only", + Rules: `node { policy = "write" }`, + }, + }, + WriteRequest: mgmtWR, + } + pResp := &structs.GenericResponse{} + err = msgpackrpc.CallWithCodec(codec, structs.ACLUpsertPoliciesRPCMethod, pReq, pResp) + must.NoError(t, err) + + tReq := &structs.ACLTokenUpsertRequest{ + Tokens: []*structs.ACLToken{ + { + Name: "invalid", + Policies: []string{"node_write_only"}, + Type: structs.ACLClientToken, + }, + }, + WriteRequest: mgmtWR, + } + tResp := &structs.ACLTokenUpsertResponse{} + err = msgpackrpc.CallWithCodec(codec, structs.ACLUpsertTokensRPCMethod, tReq, tResp) + must.NoError(t, err) + + invalidToken := tResp.Tokens[0] + + testReq := &structs.RaftPeerRequest{ RaftIDAddress: structs.RaftIDAddress{Address: addr}, - WriteRequest: structs.WriteRequest{Region: s1.config.Region}, + WriteRequest: structs.WriteRequest{ + Region: s1.config.Region, + }, } var reply struct{} t.Run("no-token", func(t *testing.T) { // Try with no token and expect permission denied - err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply) + err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply) must.Error(t, err) must.ErrorIs(t, err, rpcPermDeniedErr) }) t.Run("invalid-token", func(t *testing.T) { // Try with an invalid token and expect permission denied - arg.AuthToken = invalidToken.SecretID - err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply) + testReq.AuthToken = invalidToken.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply) must.Error(t, err) must.ErrorIs(t, err, rpcPermDeniedErr) }) t.Run("good-token", func(t *testing.T) { // Try with a management token - arg.AuthToken = tc.token.SecretID - err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply) + testReq.AuthToken = tc.token.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply) must.NoError(t, err) // Is the expected leader the new one? @@ -525,55 +596,76 @@ func TestOperator_TransferLeadershipToServerAddress_ACL(t *testing.T) { func TestOperator_TransferLeadershipToServerID_ACL(t *testing.T) { ci.Parallel(t) + var err error + tc := newTestCluster(t, tcArgs{enableACL: true}) s1 := tc.leader() + must.NotNil(t, s1) codec := rpcClient(t, s1) - state := s1.fsm.State() - _, ldrID := s1.raft.LeaderWithID() - - var tgtID raft.ServerID - // Find the first non-leader server in the list. - s1.peerLock.Lock() - for _, sp := range s1.localPeers { - tgtID = raft.ServerID(sp.ID) - if tgtID != ldrID { - break - } + mgmtWR := structs.WriteRequest{ + Region: s1.config.Region, + AuthToken: tc.token.SecretID, } - s1.peerLock.Unlock() - // Create ACL token - invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite)) + // Create invalid ACL Token + pReq := &structs.ACLPolicyUpsertRequest{ + Policies: []*structs.ACLPolicy{ + { + Name: "node-write-only", + Rules: `node { policy = "write" }`, + }, + }, + WriteRequest: mgmtWR, + } + pResp := &structs.GenericResponse{} + err = msgpackrpc.CallWithCodec(codec, structs.ACLUpsertPoliciesRPCMethod, pReq, pResp) + must.NoError(t, err) - arg := &structs.RaftPeerRequest{ + tReq := &structs.ACLTokenUpsertRequest{ + Tokens: []*structs.ACLToken{ + { + Name: "invalid", + Policies: []string{"node_write_only"}, + Type: structs.ACLClientToken, + }, + }, + WriteRequest: mgmtWR, + } + tResp := &structs.ACLTokenUpsertResponse{} + err = msgpackrpc.CallWithCodec(codec, structs.ACLUpsertTokensRPCMethod, tReq, tResp) + must.NoError(t, err) + + invalidToken := tResp.Tokens[0] + + tgtID := tc.anyFollowerRaftServerID() + testReq := &structs.RaftPeerRequest{ RaftIDAddress: structs.RaftIDAddress{ ID: tgtID, }, WriteRequest: structs.WriteRequest{Region: s1.config.Region}, } - var reply struct{} t.Run("no-token", func(t *testing.T) { // Try with no token and expect permission denied - err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply) + err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply) must.Error(t, err) must.ErrorIs(t, err, rpcPermDeniedErr) }) t.Run("invalid-token", func(t *testing.T) { // Try with an invalid token and expect permission denied - arg.AuthToken = invalidToken.SecretID - err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply) + testReq.AuthToken = invalidToken.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply) must.Error(t, err) must.ErrorIs(t, err, rpcPermDeniedErr) }) t.Run("good-token", func(t *testing.T) { // Try with a management token - arg.AuthToken = tc.token.SecretID - err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply) + testReq.AuthToken = tc.token.SecretID + err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply) must.NoError(t, err) // Is the expected leader the new one?