From 5732eb2cd51ea6632cdcbaf1a716a5497058ca93 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Fri, 21 Oct 2022 12:33:16 -0400 Subject: [PATCH] keyring: fixes for keyring replication on cluster join (#14987) * keyring: don't unblock early if rate limit burst exceeded The rate limiter returns an error and unblocks early if its burst limit is exceeded (unless the burst limit is Inf). Ensure we're not unblocking early, otherwise we'll only slow down the cases where we're already pausing to make external RPC requests. * keyring: set MinQueryIndex on stale queries When keyring replication makes a stale query to non-leader peers to find a key the leader doesn't have, we need to make sure the peer we're querying has had a chance to catch up to the most current index for that key. Otherwise it's possible for newly-added servers to query another newly-added server and get a non-error nil response for that key ID. Ensure that we're setting the correct reply index in the blocking query. Note that the "not found" case does not return an error, just an empty key. So as a belt-and-suspenders, update the handling of empty responses so that we don't break the loop early if we hit a server that doesn't have the key. * test for adding new servers to keyring * leader: initialize keyring after we have consistent reads Wait until we're sure the FSM is current before we try to initialize the keyring. Also, if a key is rotated immediately following a leader election, plans that are in-flight may get signed before the new leader has the key. Allow for a short timeout-and-retry to avoid rejecting plans --- nomad/encrypter.go | 39 ++++++++++++++++++++++++++++++++------- nomad/encrypter_test.go | 29 +++++++++++++++++++++++++++-- nomad/keyring_endpoint.go | 15 ++++++++++++++- nomad/leader.go | 8 ++++---- 4 files changed, 77 insertions(+), 14 deletions(-) diff --git a/nomad/encrypter.go b/nomad/encrypter.go index e6e05bcce..f13dc84be 100644 --- a/nomad/encrypter.go +++ b/nomad/encrypter.go @@ -155,12 +155,33 @@ const keyIDHeader = "kid" // SignClaims signs the identity claim for the task and returns an // encoded JWT with both the claim and its signature func (e *Encrypter) SignClaims(claim *structs.IdentityClaims) (string, error) { - e.lock.RLock() - defer e.lock.RUnlock() - keyset, err := e.activeKeySetLocked() + getActiveKeyset := func() (*keyset, error) { + e.lock.RLock() + defer e.lock.RUnlock() + keyset, err := e.activeKeySetLocked() + return keyset, err + } + + // If a key is rotated immediately following a leader election, plans that + // are in-flight may get signed before the new leader has the key. Allow for + // a short timeout-and-retry to avoid rejecting plans + keyset, err := getActiveKeyset() if err != nil { - return "", err + ctx, cancel := context.WithTimeout(e.srv.shutdownCtx, 5*time.Second) + defer cancel() + for { + select { + case <-ctx.Done(): + return "", err + default: + time.Sleep(50 * time.Millisecond) + keyset, err = getActiveKeyset() + if keyset != nil { + break + } + } + } } token := jwt.NewWithClaims(&jwt.SigningMethodEd25519{}, claim) @@ -435,7 +456,10 @@ START: return default: // Rate limit how often we attempt replication - limiter.Wait(ctx) + err := limiter.Wait(ctx) + if err != nil { + goto ERR_WAIT // rate limit exceeded + } ws := store.NewWatchSet() iter, err := store.RootKeyMetas(ws) @@ -461,7 +485,8 @@ START: getReq := &structs.KeyringGetRootKeyRequest{ KeyID: keyID, QueryOptions: structs.QueryOptions{ - Region: krr.srv.config.Region, + Region: krr.srv.config.Region, + MinQueryIndex: keyMeta.ModifyIndex - 1, }, } getResp := &structs.KeyringGetRootKeyResponse{} @@ -479,7 +504,7 @@ START: getReq.AllowStale = true for _, peer := range krr.getAllPeers() { err = krr.srv.forwardServer(peer, "Keyring.Get", getReq, getResp) - if err == nil { + if err == nil && getResp.Key != nil { break } } diff --git a/nomad/encrypter_test.go b/nomad/encrypter_test.go index b006b503d..98d717586 100644 --- a/nomad/encrypter_test.go +++ b/nomad/encrypter_test.go @@ -123,8 +123,8 @@ func TestEncrypter_Restore(t *testing.T) { } } -// TestKeyringReplicator exercises key replication between servers -func TestKeyringReplicator(t *testing.T) { +// TestEncrypter_KeyringReplication exercises key replication between servers +func TestEncrypter_KeyringReplication(t *testing.T) { ci.Parallel(t) @@ -267,6 +267,31 @@ func TestKeyringReplicator(t *testing.T) { require.Eventually(t, checkReplicationFn(keyID3), time.Second*5, time.Second, "expected keys to be replicated to followers after election") + + // Scenario: new members join the cluster + + srv4, cleanupSRV4 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 0 + c.NumSchedulers = 0 + }) + defer cleanupSRV4() + srv5, cleanupSRV5 := TestServer(t, func(c *Config) { + c.BootstrapExpect = 0 + c.NumSchedulers = 0 + }) + defer cleanupSRV5() + + TestJoin(t, srv4, srv5) + TestJoin(t, srv5, srv1) + servers = []*Server{srv1, srv2, srv3, srv4, srv5} + + testutil.WaitForLeader(t, srv4.RPC) + testutil.WaitForLeader(t, srv5.RPC) + + require.Eventually(t, checkReplicationFn(keyID3), + time.Second*5, time.Second, + "expected new servers to get replicated keys") + } func TestEncrypter_EncryptDecrypt(t *testing.T) { diff --git a/nomad/keyring_endpoint.go b/nomad/keyring_endpoint.go index 78d4808ce..9b7e27ad9 100644 --- a/nomad/keyring_endpoint.go +++ b/nomad/keyring_endpoint.go @@ -264,7 +264,20 @@ func (k *Keyring) Get(args *structs.KeyringGetRootKeyRequest, reply *structs.Key Key: key, } reply.Key = rootKey - reply.Index = keyMeta.ModifyIndex + + // Use the last index that affected the policy table + index, err := s.Index(state.TableRootKeyMeta) + if err != nil { + return err + } + + // Ensure we never set the index to zero, otherwise a blocking query + // cannot be used. We floor the index at one, since realistically + // the first write must have a higher index. + if index == 0 { + index = 1 + } + reply.Index = index return nil }, } diff --git a/nomad/leader.go b/nomad/leader.go index 141b1df10..b6b2895fa 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -303,9 +303,6 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Initialize scheduler configuration. schedulerConfig := s.getOrCreateSchedulerConfig() - // Create the first root key if it doesn't already exist - go s.initializeKeyring(stopCh) - // Initialize the ClusterID _, _ = s.ClusterID() // todo: use cluster ID for stuff, later! @@ -350,6 +347,9 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { // Further clean ups and follow up that don't block RPC consistency + // Create the first root key if it doesn't already exist + go s.initializeKeyring(stopCh) + // Restore the periodic dispatcher state if err := s.restorePeriodicDispatcher(); err != nil { return err @@ -2005,7 +2005,7 @@ func (s *Server) initializeKeyring(stopCh <-chan struct{}) { break } } - // we might have lost leadershuip during the version check + // we might have lost leadership during the version check if !s.IsLeader() { return }