From e2ed029cd418aae4470964388b847f49e461ed2c Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Sun, 13 Aug 2017 16:45:13 -0700 Subject: [PATCH] nomad: adding global token replication --- nomad/leader.go | 126 +++++++++++++++++++++++++++++++++++++++++++ nomad/leader_test.go | 77 ++++++++++++++++++++++++++ 2 files changed, 203 insertions(+) diff --git a/nomad/leader.go b/nomad/leader.go index 56bc8e062..70e13105b 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -807,10 +807,136 @@ func diffACLPolicies(state *state.StateStore, minIndex uint64, remoteList []*str // replicateACLTokens is used to replicate global ACL tokens from // the authoritative region to this region. func (s *Server) replicateACLTokens(stopCh chan struct{}) { + req := structs.ACLTokenListRequest{ + GlobalOnly: true, + } + req.Region = s.config.AuthoritativeRegion + limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit)) + s.logger.Printf("[DEBUG] nomad: starting ACL token replication from authoritative region '%s'", req.Region) + +START: for { select { case <-stopCh: return + default: + // Rate limit how often we attempt replication + limiter.Wait(context.Background()) + + // Fetch the list of tokens + var resp structs.ACLTokenListResponse + err := s.forwardRegion(s.config.AuthoritativeRegion, + "ACL.ListTokens", &req, &resp) + if err != nil { + s.logger.Printf("[ERR] nomad: failed to fetch tokens from authoritative region: %v", err) + goto ERR_WAIT + } + + // Perform a two-way diff + delete, update := diffACLTokens(s.State(), req.MinQueryIndex, resp.Tokens) + + // Delete tokens that should not exist + if len(delete) > 0 { + args := &structs.ACLTokenDeleteRequest{ + AccessorIDs: delete, + } + _, _, err := s.raftApply(structs.ACLTokenDeleteRequestType, args) + if err != nil { + s.logger.Printf("[ERR] nomad: failed to delete tokens: %v", err) + goto ERR_WAIT + } + } + + // Fetch any outdated policies + var fetched []*structs.ACLToken + for _, tokenID := range update { + req := structs.ACLTokenSpecificRequest{ + AccessorID: tokenID, + } + req.Region = s.config.AuthoritativeRegion + var reply structs.SingleACLTokenResponse + err := s.forwardRegion(s.config.AuthoritativeRegion, + "ACL.GetToken", &req, &reply) + if err != nil { + s.logger.Printf("[ERR] nomad: failed to fetch token '%s' from authoritative region: %v", tokenID, err) + goto ERR_WAIT + } + if reply.Token != nil { + fetched = append(fetched, reply.Token) + } + } + + // Update local tokensj + if len(fetched) > 0 { + args := &structs.ACLTokenUpsertRequest{ + Tokens: fetched, + } + _, _, err := s.raftApply(structs.ACLTokenUpsertRequestType, args) + if err != nil { + s.logger.Printf("[ERR] nomad: failed to update tokens: %v", err) + goto ERR_WAIT + } + } + + // Update the minimum query index, blocks until there + // is a change. + req.MinQueryIndex = resp.Index } } + +ERR_WAIT: + select { + case <-time.After(s.config.ReplicationBackoff): + goto START + case <-stopCh: + return + } +} + +// diffACLTokens is used to perform a two-way diff between the local +// tokens and the remote tokens to determine which tokens need to +// be deleted or updated. +func diffACLTokens(state *state.StateStore, minIndex uint64, remoteList []*structs.ACLTokenListStub) (delete []string, update []string) { + // Construct a set of the local and remote policies + local := make(map[string]struct{}) + remote := make(map[string]struct{}) + + // Add all the local global tokens + iter, err := state.ACLTokensByGlobal(nil, true) + if err != nil { + panic("failed to iterate local tokens") + } + for { + raw := iter.Next() + if raw == nil { + break + } + token := raw.(*structs.ACLToken) + local[token.AccessorID] = struct{}{} + } + + // Iterate over the remote tokens + for _, rp := range remoteList { + remote[rp.AccessorID] = struct{}{} + + // Check if the token is missing locally + if _, ok := local[rp.AccessorID]; !ok { + update = append(update, rp.AccessorID) + + // Check if token is newer remotely + // TODO: Eventually would be nice to use an object + // hash or something to avoid fetching tokens that + // are unchanged. + } else if rp.ModifyIndex > minIndex { + update = append(update, rp.AccessorID) + } + } + + // Check if local token should be deleted + for lp := range local { + if _, ok := remote[lp]; !ok { + delete = append(delete, lp) + } + } + return } diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 5430865bd..3dab4440b 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -696,3 +696,80 @@ func TestLeader_DiffACLPolicies(t *testing.T) { // P2 is un-modified - ignore. P3 modified, P4 new. assert.Equal(t, []string{p3.Name, p4.Name}, update) } + +func TestLeader_ReplicateACLTokens(t *testing.T) { + t.Parallel() + s1 := testServer(t, func(c *Config) { + c.Region = "region1" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + }) + defer s1.Shutdown() + s2 := testServer(t, func(c *Config) { + c.Region = "region2" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + c.ReplicationBackoff = 20 * time.Millisecond + }) + defer s2.Shutdown() + testJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + // Write a token to the authoritative region + p1 := mock.ACLToken() + p1.Global = true + if err := s1.State().UpsertACLTokens(100, []*structs.ACLToken{p1}); err != nil { + t.Fatalf("bad: %v", err) + } + + // Wait for the token to replicate + testutil.WaitForResult(func() (bool, error) { + state := s2.State() + out, err := state.ACLTokenByAccessorID(nil, p1.AccessorID) + return out != nil, err + }, func(err error) { + t.Fatalf("should replicate token") + }) +} + +func TestLeader_DiffACLTokens(t *testing.T) { + t.Parallel() + + state, err := state.NewStateStore(os.Stderr) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Populate the local state + p0 := mock.ACLToken() + p1 := mock.ACLToken() + p1.Global = true + p2 := mock.ACLToken() + p2.Global = true + p3 := mock.ACLToken() + p3.Global = true + err = state.UpsertACLTokens(100, []*structs.ACLToken{p0, p1, p2, p3}) + assert.Nil(t, err) + + // Simulate a remote list + p2Stub := p2.Stub() + p2Stub.ModifyIndex = 50 // Ignored, same index + p3Stub := p3.Stub() + p3Stub.ModifyIndex = 100 // Updated, higher index + p4 := mock.ACLToken() + p4.Global = true + remoteList := []*structs.ACLTokenListStub{ + p2Stub, + p3Stub, + p4.Stub(), + } + delete, update := diffACLTokens(state, 50, remoteList) + + // P0 is local and should be ignored + // P1 does not exist on the remote side, should delete + assert.Equal(t, []string{p1.AccessorID}, delete) + + // P2 is un-modified - ignore. P3 modified, P4 new. + assert.Equal(t, []string{p3.AccessorID, p4.AccessorID}, update) +}