nomad: adding global token replication

This commit is contained in:
Armon Dadgar
2017-08-13 16:45:13 -07:00
parent 7614c8fdb5
commit e2ed029cd4
2 changed files with 203 additions and 0 deletions

View File

@@ -807,10 +807,136 @@ func diffACLPolicies(state *state.StateStore, minIndex uint64, remoteList []*str
// replicateACLTokens is used to replicate global ACL tokens from
// the authoritative region to this region.
func (s *Server) replicateACLTokens(stopCh chan struct{}) {
req := structs.ACLTokenListRequest{
GlobalOnly: true,
}
req.Region = s.config.AuthoritativeRegion
limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit))
s.logger.Printf("[DEBUG] nomad: starting ACL token replication from authoritative region '%s'", req.Region)
START:
for {
select {
case <-stopCh:
return
default:
// Rate limit how often we attempt replication
limiter.Wait(context.Background())
// Fetch the list of tokens
var resp structs.ACLTokenListResponse
err := s.forwardRegion(s.config.AuthoritativeRegion,
"ACL.ListTokens", &req, &resp)
if err != nil {
s.logger.Printf("[ERR] nomad: failed to fetch tokens from authoritative region: %v", err)
goto ERR_WAIT
}
// Perform a two-way diff
delete, update := diffACLTokens(s.State(), req.MinQueryIndex, resp.Tokens)
// Delete tokens that should not exist
if len(delete) > 0 {
args := &structs.ACLTokenDeleteRequest{
AccessorIDs: delete,
}
_, _, err := s.raftApply(structs.ACLTokenDeleteRequestType, args)
if err != nil {
s.logger.Printf("[ERR] nomad: failed to delete tokens: %v", err)
goto ERR_WAIT
}
}
// Fetch any outdated policies
var fetched []*structs.ACLToken
for _, tokenID := range update {
req := structs.ACLTokenSpecificRequest{
AccessorID: tokenID,
}
req.Region = s.config.AuthoritativeRegion
var reply structs.SingleACLTokenResponse
err := s.forwardRegion(s.config.AuthoritativeRegion,
"ACL.GetToken", &req, &reply)
if err != nil {
s.logger.Printf("[ERR] nomad: failed to fetch token '%s' from authoritative region: %v", tokenID, err)
goto ERR_WAIT
}
if reply.Token != nil {
fetched = append(fetched, reply.Token)
}
}
// Update local tokensj
if len(fetched) > 0 {
args := &structs.ACLTokenUpsertRequest{
Tokens: fetched,
}
_, _, err := s.raftApply(structs.ACLTokenUpsertRequestType, args)
if err != nil {
s.logger.Printf("[ERR] nomad: failed to update tokens: %v", err)
goto ERR_WAIT
}
}
// Update the minimum query index, blocks until there
// is a change.
req.MinQueryIndex = resp.Index
}
}
ERR_WAIT:
select {
case <-time.After(s.config.ReplicationBackoff):
goto START
case <-stopCh:
return
}
}
// diffACLTokens is used to perform a two-way diff between the local
// tokens and the remote tokens to determine which tokens need to
// be deleted or updated.
func diffACLTokens(state *state.StateStore, minIndex uint64, remoteList []*structs.ACLTokenListStub) (delete []string, update []string) {
// Construct a set of the local and remote policies
local := make(map[string]struct{})
remote := make(map[string]struct{})
// Add all the local global tokens
iter, err := state.ACLTokensByGlobal(nil, true)
if err != nil {
panic("failed to iterate local tokens")
}
for {
raw := iter.Next()
if raw == nil {
break
}
token := raw.(*structs.ACLToken)
local[token.AccessorID] = struct{}{}
}
// Iterate over the remote tokens
for _, rp := range remoteList {
remote[rp.AccessorID] = struct{}{}
// Check if the token is missing locally
if _, ok := local[rp.AccessorID]; !ok {
update = append(update, rp.AccessorID)
// Check if token is newer remotely
// TODO: Eventually would be nice to use an object
// hash or something to avoid fetching tokens that
// are unchanged.
} else if rp.ModifyIndex > minIndex {
update = append(update, rp.AccessorID)
}
}
// Check if local token should be deleted
for lp := range local {
if _, ok := remote[lp]; !ok {
delete = append(delete, lp)
}
}
return
}

View File

@@ -696,3 +696,80 @@ func TestLeader_DiffACLPolicies(t *testing.T) {
// P2 is un-modified - ignore. P3 modified, P4 new.
assert.Equal(t, []string{p3.Name, p4.Name}, update)
}
func TestLeader_ReplicateACLTokens(t *testing.T) {
t.Parallel()
s1 := testServer(t, func(c *Config) {
c.Region = "region1"
c.AuthoritativeRegion = "region1"
c.ACLEnabled = true
})
defer s1.Shutdown()
s2 := testServer(t, func(c *Config) {
c.Region = "region2"
c.AuthoritativeRegion = "region1"
c.ACLEnabled = true
c.ReplicationBackoff = 20 * time.Millisecond
})
defer s2.Shutdown()
testJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
// Write a token to the authoritative region
p1 := mock.ACLToken()
p1.Global = true
if err := s1.State().UpsertACLTokens(100, []*structs.ACLToken{p1}); err != nil {
t.Fatalf("bad: %v", err)
}
// Wait for the token to replicate
testutil.WaitForResult(func() (bool, error) {
state := s2.State()
out, err := state.ACLTokenByAccessorID(nil, p1.AccessorID)
return out != nil, err
}, func(err error) {
t.Fatalf("should replicate token")
})
}
func TestLeader_DiffACLTokens(t *testing.T) {
t.Parallel()
state, err := state.NewStateStore(os.Stderr)
if err != nil {
t.Fatalf("err: %v", err)
}
// Populate the local state
p0 := mock.ACLToken()
p1 := mock.ACLToken()
p1.Global = true
p2 := mock.ACLToken()
p2.Global = true
p3 := mock.ACLToken()
p3.Global = true
err = state.UpsertACLTokens(100, []*structs.ACLToken{p0, p1, p2, p3})
assert.Nil(t, err)
// Simulate a remote list
p2Stub := p2.Stub()
p2Stub.ModifyIndex = 50 // Ignored, same index
p3Stub := p3.Stub()
p3Stub.ModifyIndex = 100 // Updated, higher index
p4 := mock.ACLToken()
p4.Global = true
remoteList := []*structs.ACLTokenListStub{
p2Stub,
p3Stub,
p4.Stub(),
}
delete, update := diffACLTokens(state, 50, remoteList)
// P0 is local and should be ignored
// P1 does not exist on the remote side, should delete
assert.Equal(t, []string{p1.AccessorID}, delete)
// P2 is un-modified - ignore. P3 modified, P4 new.
assert.Equal(t, []string{p3.AccessorID, p4.AccessorID}, update)
}