mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
Fix LeadershipTransfer tests (#20154)
Multi-node Nomad clusters under test must use the RPC calls to bootstrap the ACL subsystem. The original implementation of the testcluster tried naively supersizing the single node behavior in TestRaftRemovePeer, which mutates the single node's state directly. In the case of a multi- node cluster, the RPC calls are necessary to ensure that the data is replicated via Raft to all of the cluster members.
This commit is contained in:
@@ -14,6 +14,7 @@ import (
|
||||
"path"
|
||||
"reflect"
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -378,10 +379,10 @@ func TestOperator_RaftRemovePeerByID_ACL(t *testing.T) {
|
||||
|
||||
type testcluster struct {
|
||||
t *testing.T
|
||||
args tcArgs
|
||||
server []*Server
|
||||
cleanup []func()
|
||||
token *structs.ACLToken
|
||||
rpc func(string, any, any) error
|
||||
}
|
||||
|
||||
func (tc testcluster) Cleanup() {
|
||||
@@ -406,6 +407,7 @@ func newTestCluster(t *testing.T, args tcArgs) (tc testcluster) {
|
||||
cSize := args.size
|
||||
out := testcluster{
|
||||
t: t,
|
||||
args: args,
|
||||
server: make([]*Server, cSize),
|
||||
cleanup: make([]func(), cSize),
|
||||
}
|
||||
@@ -419,26 +421,46 @@ func newTestCluster(t *testing.T, args tcArgs) (tc testcluster) {
|
||||
})
|
||||
}
|
||||
t.Cleanup(out.Cleanup)
|
||||
out.rpc = out.server[0].RPC
|
||||
|
||||
TestJoin(t, out.server...)
|
||||
out.WaitForLeader()
|
||||
|
||||
if args.enableACL {
|
||||
s1 := out.server[0]
|
||||
bsToken := new(structs.ACLToken)
|
||||
// Bootstrap the ACL subsystem
|
||||
token := mock.ACLManagementToken()
|
||||
err := out.server[0].State().BootstrapACLTokens(structs.MsgTypeTestSetup, 1, 0, token)
|
||||
req := &structs.ACLTokenBootstrapRequest{
|
||||
Token: bsToken,
|
||||
WriteRequest: structs.WriteRequest{Region: s1.config.Region},
|
||||
}
|
||||
resp := &structs.ACLTokenUpsertResponse{}
|
||||
err := out.server[0].RPC("ACL.Bootstrap", req, resp)
|
||||
if err != nil {
|
||||
t.Fatalf("failed to bootstrap ACL token: %v", err)
|
||||
}
|
||||
t.Logf("bootstrap token: %v", *token)
|
||||
out.token = token
|
||||
t.Logf("bootstrap token: %v", *resp.Tokens[0])
|
||||
out.token = resp.Tokens[0]
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// WaitForLeader performs a parallel WaitForLeader over each cluster member,
|
||||
// because testutil doesn't export rpcFn so we can't create a collection of
|
||||
// rpcFn to use testutil.WaitForLeaders directly.
|
||||
func (tc testcluster) WaitForLeader() {
|
||||
testutil.WaitForLeader(tc.t, tc.rpc)
|
||||
var wg sync.WaitGroup
|
||||
for i := 0; i < len(tc.server); i++ {
|
||||
idx := i
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
// The WaitForLeader func uses WaitForResultRetries
|
||||
// so this should timeout at 5 seconds * test multiplier
|
||||
testutil.WaitForLeader(tc.t, tc.server[idx].RPC)
|
||||
}()
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
func (tc testcluster) leader() *Server {
|
||||
@@ -450,33 +472,35 @@ func (tc testcluster) leader() *Server {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
func (tc testcluster) anyFollowerRaftServerID() raft.ServerID {
|
||||
tc.WaitForLeader()
|
||||
s1 := tc.server[0]
|
||||
_, ldrID := s1.raft.LeaderWithID()
|
||||
|
||||
func (tc testcluster) anyFollower() *Server {
|
||||
if len(tc.server) < 2 {
|
||||
return nil
|
||||
}
|
||||
var tgtID raft.ServerID
|
||||
|
||||
testutil.WaitForLeader(tc.t, tc.rpc)
|
||||
for _, s := range tc.server {
|
||||
if isLeader, _ := s.getLeader(); !isLeader {
|
||||
return s
|
||||
s1.peerLock.Lock()
|
||||
defer s1.peerLock.Unlock()
|
||||
|
||||
// Find the first non-leader server in the list.
|
||||
for _, sp := range s1.localPeers {
|
||||
tgtID = raft.ServerID(sp.ID)
|
||||
if tgtID != ldrID {
|
||||
break
|
||||
}
|
||||
}
|
||||
// something weird happened.
|
||||
return nil
|
||||
return tgtID
|
||||
}
|
||||
|
||||
func TestOperator_TransferLeadershipToServerAddress_ACL(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
|
||||
tc := newTestCluster(t, tcArgs{enableACL: true})
|
||||
s1 := tc.leader()
|
||||
codec := rpcClient(t, s1)
|
||||
state := s1.fsm.State()
|
||||
|
||||
func (tc testcluster) anyFollowerRaftServerAddress() raft.ServerAddress {
|
||||
tc.WaitForLeader()
|
||||
s1 := tc.server[0]
|
||||
lAddr, _ := s1.raft.LeaderWithID()
|
||||
|
||||
var addr raft.ServerAddress
|
||||
|
||||
s1.peerLock.Lock()
|
||||
defer s1.peerLock.Unlock()
|
||||
|
||||
// Find the first non-leader server in the list.
|
||||
for a := range s1.localPeers {
|
||||
addr = a
|
||||
@@ -484,36 +508,83 @@ func TestOperator_TransferLeadershipToServerAddress_ACL(t *testing.T) {
|
||||
break
|
||||
}
|
||||
}
|
||||
return addr
|
||||
}
|
||||
|
||||
// Create ACL token
|
||||
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
|
||||
func TestOperator_TransferLeadershipToServerAddress_ACL(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
var err error
|
||||
|
||||
arg := &structs.RaftPeerRequest{
|
||||
tc := newTestCluster(t, tcArgs{enableACL: true})
|
||||
s1 := tc.leader()
|
||||
must.NotNil(t, s1)
|
||||
codec := rpcClient(t, s1)
|
||||
|
||||
addr := tc.anyFollowerRaftServerAddress()
|
||||
|
||||
mgmtWR := structs.WriteRequest{
|
||||
Region: s1.config.Region,
|
||||
AuthToken: tc.token.SecretID,
|
||||
}
|
||||
|
||||
// Create invalid ACL Token
|
||||
pReq := &structs.ACLPolicyUpsertRequest{
|
||||
Policies: []*structs.ACLPolicy{
|
||||
{
|
||||
Name: "node-write-only",
|
||||
Rules: `node { policy = "write" }`,
|
||||
},
|
||||
},
|
||||
WriteRequest: mgmtWR,
|
||||
}
|
||||
pResp := &structs.GenericResponse{}
|
||||
err = msgpackrpc.CallWithCodec(codec, structs.ACLUpsertPoliciesRPCMethod, pReq, pResp)
|
||||
must.NoError(t, err)
|
||||
|
||||
tReq := &structs.ACLTokenUpsertRequest{
|
||||
Tokens: []*structs.ACLToken{
|
||||
{
|
||||
Name: "invalid",
|
||||
Policies: []string{"node_write_only"},
|
||||
Type: structs.ACLClientToken,
|
||||
},
|
||||
},
|
||||
WriteRequest: mgmtWR,
|
||||
}
|
||||
tResp := &structs.ACLTokenUpsertResponse{}
|
||||
err = msgpackrpc.CallWithCodec(codec, structs.ACLUpsertTokensRPCMethod, tReq, tResp)
|
||||
must.NoError(t, err)
|
||||
|
||||
invalidToken := tResp.Tokens[0]
|
||||
|
||||
testReq := &structs.RaftPeerRequest{
|
||||
RaftIDAddress: structs.RaftIDAddress{Address: addr},
|
||||
WriteRequest: structs.WriteRequest{Region: s1.config.Region},
|
||||
WriteRequest: structs.WriteRequest{
|
||||
Region: s1.config.Region,
|
||||
},
|
||||
}
|
||||
|
||||
var reply struct{}
|
||||
|
||||
t.Run("no-token", func(t *testing.T) {
|
||||
// Try with no token and expect permission denied
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply)
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply)
|
||||
must.Error(t, err)
|
||||
must.ErrorIs(t, err, rpcPermDeniedErr)
|
||||
})
|
||||
|
||||
t.Run("invalid-token", func(t *testing.T) {
|
||||
// Try with an invalid token and expect permission denied
|
||||
arg.AuthToken = invalidToken.SecretID
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply)
|
||||
testReq.AuthToken = invalidToken.SecretID
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply)
|
||||
must.Error(t, err)
|
||||
must.ErrorIs(t, err, rpcPermDeniedErr)
|
||||
})
|
||||
|
||||
t.Run("good-token", func(t *testing.T) {
|
||||
// Try with a management token
|
||||
arg.AuthToken = tc.token.SecretID
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply)
|
||||
testReq.AuthToken = tc.token.SecretID
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Is the expected leader the new one?
|
||||
@@ -525,55 +596,76 @@ func TestOperator_TransferLeadershipToServerAddress_ACL(t *testing.T) {
|
||||
|
||||
func TestOperator_TransferLeadershipToServerID_ACL(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
var err error
|
||||
|
||||
tc := newTestCluster(t, tcArgs{enableACL: true})
|
||||
s1 := tc.leader()
|
||||
must.NotNil(t, s1)
|
||||
codec := rpcClient(t, s1)
|
||||
state := s1.fsm.State()
|
||||
|
||||
_, ldrID := s1.raft.LeaderWithID()
|
||||
|
||||
var tgtID raft.ServerID
|
||||
// Find the first non-leader server in the list.
|
||||
s1.peerLock.Lock()
|
||||
for _, sp := range s1.localPeers {
|
||||
tgtID = raft.ServerID(sp.ID)
|
||||
if tgtID != ldrID {
|
||||
break
|
||||
}
|
||||
mgmtWR := structs.WriteRequest{
|
||||
Region: s1.config.Region,
|
||||
AuthToken: tc.token.SecretID,
|
||||
}
|
||||
s1.peerLock.Unlock()
|
||||
|
||||
// Create ACL token
|
||||
invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", mock.NodePolicy(acl.PolicyWrite))
|
||||
// Create invalid ACL Token
|
||||
pReq := &structs.ACLPolicyUpsertRequest{
|
||||
Policies: []*structs.ACLPolicy{
|
||||
{
|
||||
Name: "node-write-only",
|
||||
Rules: `node { policy = "write" }`,
|
||||
},
|
||||
},
|
||||
WriteRequest: mgmtWR,
|
||||
}
|
||||
pResp := &structs.GenericResponse{}
|
||||
err = msgpackrpc.CallWithCodec(codec, structs.ACLUpsertPoliciesRPCMethod, pReq, pResp)
|
||||
must.NoError(t, err)
|
||||
|
||||
arg := &structs.RaftPeerRequest{
|
||||
tReq := &structs.ACLTokenUpsertRequest{
|
||||
Tokens: []*structs.ACLToken{
|
||||
{
|
||||
Name: "invalid",
|
||||
Policies: []string{"node_write_only"},
|
||||
Type: structs.ACLClientToken,
|
||||
},
|
||||
},
|
||||
WriteRequest: mgmtWR,
|
||||
}
|
||||
tResp := &structs.ACLTokenUpsertResponse{}
|
||||
err = msgpackrpc.CallWithCodec(codec, structs.ACLUpsertTokensRPCMethod, tReq, tResp)
|
||||
must.NoError(t, err)
|
||||
|
||||
invalidToken := tResp.Tokens[0]
|
||||
|
||||
tgtID := tc.anyFollowerRaftServerID()
|
||||
testReq := &structs.RaftPeerRequest{
|
||||
RaftIDAddress: structs.RaftIDAddress{
|
||||
ID: tgtID,
|
||||
},
|
||||
WriteRequest: structs.WriteRequest{Region: s1.config.Region},
|
||||
}
|
||||
|
||||
var reply struct{}
|
||||
|
||||
t.Run("no-token", func(t *testing.T) {
|
||||
// Try with no token and expect permission denied
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply)
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply)
|
||||
must.Error(t, err)
|
||||
must.ErrorIs(t, err, rpcPermDeniedErr)
|
||||
})
|
||||
|
||||
t.Run("invalid-token", func(t *testing.T) {
|
||||
// Try with an invalid token and expect permission denied
|
||||
arg.AuthToken = invalidToken.SecretID
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply)
|
||||
testReq.AuthToken = invalidToken.SecretID
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply)
|
||||
must.Error(t, err)
|
||||
must.ErrorIs(t, err, rpcPermDeniedErr)
|
||||
})
|
||||
|
||||
t.Run("good-token", func(t *testing.T) {
|
||||
// Try with a management token
|
||||
arg.AuthToken = tc.token.SecretID
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", arg, &reply)
|
||||
testReq.AuthToken = tc.token.SecretID
|
||||
err := msgpackrpc.CallWithCodec(codec, "Operator.TransferLeadershipToPeer", testReq, &reply)
|
||||
must.NoError(t, err)
|
||||
|
||||
// Is the expected leader the new one?
|
||||
|
||||
Reference in New Issue
Block a user