From 5bb6e5758defe37e260dc6b7362c5ecbb447fc05 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 12 Jun 2023 13:24:24 -0400 Subject: [PATCH] node pools: replicate from authoritative region (#17456) Upserts and deletes of node pools are forwarded to the authoritative region, just like we do for namespaces, quotas, ACL policies, etc. Replicate node pools from the authoritative region. --- command/agent/node_pool_endpoint_test.go | 5 + nomad/leader.go | 141 +++++++++++++++++++++++ nomad/leader_test.go | 82 +++++++++++++ nomad/mock/mock.go | 4 +- nomad/node_pool_endpoint.go | 2 + nomad/node_pool_endpoint_test.go | 2 + nomad/structs/node_pool.go | 44 +++++++ 7 files changed, 279 insertions(+), 1 deletion(-) diff --git a/command/agent/node_pool_endpoint_test.go b/command/agent/node_pool_endpoint_test.go index ce0843374..1e744fcb6 100644 --- a/command/agent/node_pool_endpoint_test.go +++ b/command/agent/node_pool_endpoint_test.go @@ -73,6 +73,7 @@ func TestHTTP_NodePool_Info(t *testing.T) { // Verify expected pool is returned. must.Eq(t, pool, obj.(*structs.NodePool), must.Cmp(cmpopts.IgnoreFields( structs.NodePool{}, + "Hash", "CreateIndex", "ModifyIndex", ))) @@ -143,6 +144,7 @@ func TestHTTP_NodePool_Create(t *testing.T) { must.NoError(t, err) must.Eq(t, pool, got, must.Cmp(cmpopts.IgnoreFields( structs.NodePool{}, + "Hash", "CreateIndex", "ModifyIndex", ))) @@ -193,6 +195,7 @@ func TestHTTP_NodePool_Update(t *testing.T) { must.NoError(t, err) must.Eq(t, updated, got, must.Cmp(cmpopts.IgnoreFields( structs.NodePool{}, + "Hash", "CreateIndex", "ModifyIndex", ))) @@ -239,6 +242,7 @@ func TestHTTP_NodePool_Update(t *testing.T) { must.NoError(t, err) must.Eq(t, updated, got, must.Cmp(cmpopts.IgnoreFields( structs.NodePool{}, + "Hash", "CreateIndex", "ModifyIndex", ))) @@ -278,6 +282,7 @@ func TestHTTP_NodePool_Update(t *testing.T) { must.NoError(t, err) must.Eq(t, pool, got, must.Cmp(cmpopts.IgnoreFields( structs.NodePool{}, + "Hash", "CreateIndex", "ModifyIndex", ))) diff --git a/nomad/leader.go b/nomad/leader.go index c94d51878..d8898b690 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -429,6 +429,7 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { go s.replicateACLAuthMethods(stopCh) go s.replicateACLBindingRules(stopCh) go s.replicateNamespaces(stopCh) + go s.replicateNodePools(stopCh) } } @@ -600,6 +601,146 @@ func diffNamespaces(state *state.StateStore, minIndex uint64, remoteList []*stru return } +// replicateNodePools is used to replicate node pools from the authoritative +// region to this region. +func (s *Server) replicateNodePools(stopCh chan struct{}) { + req := structs.NodePoolListRequest{ + QueryOptions: structs.QueryOptions{ + Region: s.config.AuthoritativeRegion, + AllowStale: true, + }, + } + limiter := rate.NewLimiter(replicationRateLimit, int(replicationRateLimit)) + s.logger.Debug("starting node pool replication from authoritative region", "region", req.Region) + + for { + select { + case <-stopCh: + return + default: + } + + // Rate limit how often we attempt replication + limiter.Wait(context.Background()) + + if !ServersMeetMinimumVersion( + s.serf.Members(), s.Region(), minNodePoolsVersion, true) { + s.logger.Trace( + "all servers must be upgraded to 1.6.0 before Node Pools can be replicated") + if s.replicationBackoffContinue(stopCh) { + continue + } else { + return + } + } + + var resp structs.NodePoolListResponse + req.AuthToken = s.ReplicationToken() + err := s.forwardRegion(s.config.AuthoritativeRegion, "NodePool.List", &req, &resp) + if err != nil { + s.logger.Error("failed to fetch node pools from authoritative region", "error", err) + if s.replicationBackoffContinue(stopCh) { + continue + } else { + return + } + } + + // Perform a two-way diff + delete, update := diffNodePools(s.State(), req.MinQueryIndex, resp.NodePools) + + // A significant amount of time could pass between the last check + // on whether we should stop the replication process. Therefore, do + // a check here, before calling Raft. + select { + case <-stopCh: + return + default: + } + + // Delete node pools that should not exist + if len(delete) > 0 { + args := &structs.NodePoolDeleteRequest{ + Names: delete, + } + _, _, err := s.raftApply(structs.NodePoolDeleteRequestType, args) + if err != nil { + s.logger.Error("failed to delete node pools", "error", err) + if s.replicationBackoffContinue(stopCh) { + continue + } else { + return + } + } + } + + // Update local node pools + if len(update) > 0 { + args := &structs.NodePoolUpsertRequest{ + NodePools: update, + } + _, _, err := s.raftApply(structs.NodePoolUpsertRequestType, args) + if err != nil { + s.logger.Error("failed to update node pools", "error", err) + if s.replicationBackoffContinue(stopCh) { + continue + } else { + return + } + } + } + + // Update the minimum query index, blocks until there is a change. + req.MinQueryIndex = resp.Index + } +} + +// diffNodePools is used to perform a two-way diff between the local node pools +// and the remote node pools to determine which node pools need to be deleted or +// updated. +func diffNodePools(store *state.StateStore, minIndex uint64, remoteList []*structs.NodePool) (delete []string, update []*structs.NodePool) { + // Construct a set of the local and remote node pools + local := make(map[string][]byte) + remote := make(map[string]struct{}) + + // Add all the local node pools + iter, err := store.NodePools(nil, state.SortDefault) + if err != nil { + panic("failed to iterate local node pools") + } + for { + raw := iter.Next() + if raw == nil { + break + } + pool := raw.(*structs.NodePool) + local[pool.Name] = pool.Hash + } + + for _, rnp := range remoteList { + remote[rnp.Name] = struct{}{} + + if localHash, ok := local[rnp.Name]; !ok { + // Node pools that are missing locally should be added + update = append(update, rnp) + + } else if rnp.ModifyIndex > minIndex && !bytes.Equal(localHash, rnp.Hash) { + // Node pools that have been added/updated more recently than the + // last index we saw, and have a hash mismatch with what we have + // locally, should be updated. + update = append(update, rnp) + } + } + + // Node pools that don't exist on the remote should be deleted + for lnp := range local { + if _, ok := remote[lnp]; !ok { + delete = append(delete, lnp) + } + } + return +} + // restoreEvals is used to restore pending evaluations into the eval broker and // blocked evaluations into the blocked eval tracker. The broker and blocked // eval tracker is maintained only by the leader, so it must be restored anytime diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 35c289ebb..3b9267d88 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-version" + "github.com/shoenig/test" "github.com/shoenig/test/must" "github.com/shoenig/test/wait" "github.com/stretchr/testify/assert" @@ -1806,6 +1807,87 @@ func TestLeader_DiffNamespaces(t *testing.T) { assert.Equal(t, []string{ns3.Name, ns4.Name}, update) } +func TestLeader_ReplicateNodePools(t *testing.T) { + ci.Parallel(t) + + s1, root, cleanupS1 := TestACLServer(t, func(c *Config) { + c.Region = "region1" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + }) + defer cleanupS1() + s2, _, cleanupS2 := TestACLServer(t, func(c *Config) { + c.Region = "region2" + c.AuthoritativeRegion = "region1" + c.ACLEnabled = true + c.ReplicationBackoff = 20 * time.Millisecond + c.ReplicationToken = root.SecretID + }) + defer cleanupS2() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + + // Write a node pool to the authoritative region + np1 := mock.NodePool() + must.NoError(t, s1.State().UpsertNodePools( + structs.MsgTypeTestSetup, 100, []*structs.NodePool{np1})) + + // Wait for the node pool to replicate + testutil.WaitForResult(func() (bool, error) { + store := s2.State() + out, err := store.NodePoolByName(nil, np1.Name) + return out != nil, err + }, func(err error) { + t.Fatalf("should replicate node pool") + }) + + // Delete the node pool at the authoritative region + must.NoError(t, s1.State().DeleteNodePools(structs.MsgTypeTestSetup, 200, []string{np1.Name})) + + // Wait for the namespace deletion to replicate + testutil.WaitForResult(func() (bool, error) { + store := s2.State() + out, err := store.NodePoolByName(nil, np1.Name) + return out == nil, err + }, func(err error) { + t.Fatalf("should replicate node pool deletion") + }) +} + +func TestLeader_DiffNodePools(t *testing.T) { + ci.Parallel(t) + + state := state.TestStateStore(t) + + // Populate the local state + np1, np2, np3 := mock.NodePool(), mock.NodePool(), mock.NodePool() + must.NoError(t, state.UpsertNodePools( + structs.MsgTypeTestSetup, 100, []*structs.NodePool{np1, np2, np3})) + + // Simulate a remote list + rnp2 := np2.Copy() + rnp2.ModifyIndex = 50 // Ignored, same index + rnp3 := np3.Copy() + rnp3.ModifyIndex = 100 // Updated, higher index + rnp3.Description = "force a hash update" + rnp3.SetHash() + rnp4 := mock.NodePool() + remoteList := []*structs.NodePool{ + rnp2, + rnp3, + rnp4, + } + delete, update := diffNodePools(state, 50, remoteList) + sort.Strings(delete) + + // np1 does not exist on the remote side, should delete + test.Eq(t, []string{structs.NodePoolAll, structs.NodePoolDefault, np1.Name}, delete) + + // np2 is un-modified - ignore. np3 modified, np4 new. + test.Eq(t, []*structs.NodePool{rnp3, rnp4}, update) +} + // waitForStableLeadership waits until a leader is elected and all servers // get promoted as voting members, returns the leader func waitForStableLeadership(t *testing.T, servers []*Server) *Server { diff --git a/nomad/mock/mock.go b/nomad/mock/mock.go index 432ceea30..79dac82a2 100644 --- a/nomad/mock/mock.go +++ b/nomad/mock/mock.go @@ -252,7 +252,7 @@ func Namespace() *structs.Namespace { } func NodePool() *structs.NodePool { - return &structs.NodePool{ + pool := &structs.NodePool{ Name: fmt.Sprintf("pool-%s", uuid.Short()), Description: "test node pool", Meta: map[string]string{"team": "test"}, @@ -260,6 +260,8 @@ func NodePool() *structs.NodePool { SchedulerAlgorithm: structs.SchedulerAlgorithmSpread, }, } + pool.SetHash() + return pool } // ServiceRegistrations generates an array containing two unique service diff --git a/nomad/node_pool_endpoint.go b/nomad/node_pool_endpoint.go index 06256e32c..5b2e50eed 100644 --- a/nomad/node_pool_endpoint.go +++ b/nomad/node_pool_endpoint.go @@ -203,6 +203,8 @@ func (n *NodePool) UpsertNodePools(args *structs.NodePoolUpsertRequest, reply *s if pool.IsBuiltIn() { return structs.NewErrRPCCodedf(http.StatusBadRequest, "modifying node pool %q is not allowed", pool.Name) } + + pool.SetHash() } // Update via Raft. diff --git a/nomad/node_pool_endpoint_test.go b/nomad/node_pool_endpoint_test.go index 7d633f7d5..72781da1e 100644 --- a/nomad/node_pool_endpoint_test.go +++ b/nomad/node_pool_endpoint_test.go @@ -730,6 +730,7 @@ func TestNodePoolEndpoint_UpsertNodePools(t *testing.T) { must.NoError(t, err) must.Eq(t, pool, got, must.Cmp(cmpopts.IgnoreFields( structs.NodePool{}, + "Hash", "CreateIndex", "ModifyIndex", ))) @@ -865,6 +866,7 @@ func TestNodePoolEndpoint_UpsertNodePool_ACL(t *testing.T) { must.NoError(t, err) must.Eq(t, pool, got, must.Cmp(cmpopts.IgnoreFields( structs.NodePool{}, + "Hash", "CreateIndex", "ModifyIndex", ))) diff --git a/nomad/structs/node_pool.go b/nomad/structs/node_pool.go index 7616ae4f0..39fc72046 100644 --- a/nomad/structs/node_pool.go +++ b/nomad/structs/node_pool.go @@ -6,8 +6,10 @@ package structs import ( "fmt" "regexp" + "sort" "github.com/hashicorp/go-multierror" + "golang.org/x/crypto/blake2b" "golang.org/x/exp/maps" ) @@ -55,6 +57,10 @@ type NodePool struct { // node pool. SchedulerConfiguration *NodePoolSchedulerConfiguration + // Hash is the hash of the node pool which is used to efficiently diff when + // we replicate pools across regions. + Hash []byte + // Raft indexes. CreateIndex uint64 ModifyIndex uint64 @@ -91,6 +97,9 @@ func (n *NodePool) Copy() *NodePool { nc.Meta = maps.Clone(nc.Meta) nc.SchedulerConfiguration = nc.SchedulerConfiguration.Copy() + nc.Hash = make([]byte, len(n.Hash)) + copy(nc.Hash, n.Hash) + return nc } @@ -107,6 +116,41 @@ func (n *NodePool) IsBuiltIn() bool { } } +// SetHash is used to compute and set the hash of node pool +func (n *NodePool) SetHash() []byte { + // Initialize a 256bit Blake2 hash (32 bytes) + hash, err := blake2b.New256(nil) + if err != nil { + panic(err) + } + + // Write all the user set fields + _, _ = hash.Write([]byte(n.Name)) + _, _ = hash.Write([]byte(n.Description)) + if n.SchedulerConfiguration != nil { + _, _ = hash.Write([]byte(n.SchedulerConfiguration.SchedulerAlgorithm)) + } + + // sort keys to ensure hash stability when meta is stored later + var keys []string + for k := range n.Meta { + keys = append(keys, k) + } + sort.Strings(keys) + + for _, k := range keys { + _, _ = hash.Write([]byte(k)) + _, _ = hash.Write([]byte(n.Meta[k])) + } + + // Finalize the hash + hashVal := hash.Sum(nil) + + // Set and return the hash + n.Hash = hashVal + return hashVal +} + // NodePoolSchedulerConfiguration is the scheduler confinguration applied to a // node pool. type NodePoolSchedulerConfiguration struct {