From 3fb788b45b5496c1ccc4ce32dee3310097c94bd8 Mon Sep 17 00:00:00 2001 From: Mahmood Ali Date: Tue, 2 Jul 2019 15:58:02 +0800 Subject: [PATCH] Block rpc handling until state store is caught up Here, we ensure that when leader only responds to RPC calls when state store is up to date. At leadership transition or launch with restored state, the server local store might not be caught up with latest raft logs and may return a stale read. The solution here is to have an RPC consistency read gate, enabled when `establishLeadership` completes before we respond to RPC calls. `establishLeadership` is gated by a `raft.Barrier` which ensures that all prior raft logs have been applied. Conversely, the gate is disabled when leadership is lost. This is very much inspired by https://github.com/hashicorp/consul/pull/3154/files --- nomad/leader.go | 4 ++++ nomad/leader_test.go | 19 +++++++++++++++++++ nomad/rpc.go | 8 ++++++-- nomad/rpc_test.go | 41 +++++++++++++++++++++++++++++++++++++++++ nomad/server.go | 18 ++++++++++++++++++ nomad/structs/errors.go | 34 ++++++++++++++++++---------------- 6 files changed, 106 insertions(+), 18 deletions(-) diff --git a/nomad/leader.go b/nomad/leader.go index 0b08fc60c..e3e55d49f 100644 --- a/nomad/leader.go +++ b/nomad/leader.go @@ -289,6 +289,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error { return err } + s.setConsistentReadReady() + return nil } @@ -714,6 +716,8 @@ func (s *Server) iterateJobSummaryMetrics(summary *structs.JobSummary) { func (s *Server) revokeLeadership() error { defer metrics.MeasureSince([]string{"nomad", "leader", "revoke_leadership"}, time.Now()) + s.resetConsistentReadReady() + // Clear the leader token since we are no longer the leader. s.setLeaderAcl("") diff --git a/nomad/leader_test.go b/nomad/leader_test.go index 7cb3a0ed2..19ad39df8 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -1126,6 +1126,25 @@ func TestLeader_RevokeLeadership_MultipleTimes(t *testing.T) { require.Nil(t, s1.revokeLeadership()) } +func TestLeader_TransitionsUpdateConsistencyRead(t *testing.T) { + s1 := TestServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + testutil.WaitForResult(func() (bool, error) { + return s1.isReadyForConsistentReads(), nil + }, func(err error) { + require.Fail(t, "should have finished establish leader loop") + }) + + require.Nil(t, s1.revokeLeadership()) + require.False(t, s1.isReadyForConsistentReads()) + + ch := make(chan struct{}) + require.Nil(t, s1.establishLeadership(ch)) + require.True(t, s1.isReadyForConsistentReads()) +} + // Test doing an inplace upgrade on a server from raft protocol 2 to 3 // This verifies that removing the server and adding it back with a uuid works // even if the server's address stays the same. diff --git a/nomad/rpc.go b/nomad/rpc.go index ed662762a..e6ea18fcb 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -432,7 +432,7 @@ CHECK_LEADER: isLeader, remoteServer := r.getLeader() // Handle the case we are the leader - if isLeader { + if isLeader && r.Server.isReadyForConsistentReads() { return false, nil } @@ -457,7 +457,11 @@ CHECK_LEADER: } } - // No leader found and hold time exceeded + // hold time exceeeded without being ready to respond + if isLeader { + return true, structs.ErrNotReadyForConsistentReads + } + return true, structs.ErrNoLeader } diff --git a/nomad/rpc_test.go b/nomad/rpc_test.go index a9413e525..b97667063 100644 --- a/nomad/rpc_test.go +++ b/nomad/rpc_test.go @@ -74,6 +74,47 @@ func TestRPC_forwardLeader(t *testing.T) { } } +func TestRPC_WaitForConsistentReads(t *testing.T) { + t.Parallel() + s1 := TestServer(t, func(c *Config) { + c.RPCHoldTimeout = 20 * time.Millisecond + }) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + isLeader, _ := s1.getLeader() + require.True(t, isLeader) + require.True(t, s1.isReadyForConsistentReads()) + + s1.resetConsistentReadReady() + require.False(t, s1.isReadyForConsistentReads()) + + codec := rpcClient(t, s1) + + get := &structs.JobListRequest{ + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: "default", + }, + } + + // check timeout while waiting for consistency + var resp structs.JobListResponse + err := msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp) + require.Error(t, err) + require.Contains(t, err.Error(), structs.ErrNotReadyForConsistentReads.Error()) + + // check we wait and block + go func() { + time.Sleep(5 * time.Millisecond) + s1.setConsistentReadReady() + }() + + err = msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp) + require.NoError(t, err) + +} + func TestRPC_forwardRegion(t *testing.T) { t.Parallel() s1 := TestServer(t, nil) diff --git a/nomad/server.go b/nomad/server.go index 2df2dca6a..ecf9d0533 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -167,6 +167,9 @@ type Server struct { // join/leave from the region. reconcileCh chan serf.Member + // used to track when the server is ready to serve consistent reads, updated atomically + readyForConsistentReads int32 + // eventCh is used to receive events from the serf cluster eventCh chan serf.Event @@ -1400,6 +1403,21 @@ func (s *Server) getLeaderAcl() string { return s.leaderAcl } +// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write +func (s *Server) setConsistentReadReady() { + atomic.StoreInt32(&s.readyForConsistentReads, 1) +} + +// Atomically reset readiness state flag on leadership revoke +func (s *Server) resetConsistentReadReady() { + atomic.StoreInt32(&s.readyForConsistentReads, 0) +} + +// Returns true if this server is ready to serve consistent reads +func (s *Server) isReadyForConsistentReads() bool { + return atomic.LoadInt32(&s.readyForConsistentReads) == 1 +} + // Regions returns the known regions in the cluster. func (s *Server) Regions() []string { s.peerLock.RLock() diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go index f16461250..5b9166d3f 100644 --- a/nomad/structs/errors.go +++ b/nomad/structs/errors.go @@ -7,14 +7,15 @@ import ( ) const ( - errNoLeader = "No cluster leader" - errNoRegionPath = "No path to region" - errTokenNotFound = "ACL token not found" - errPermissionDenied = "Permission denied" - errNoNodeConn = "No path to node" - errUnknownMethod = "Unknown rpc method" - errUnknownNomadVersion = "Unable to determine Nomad version" - errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later" + errNoLeader = "No cluster leader" + errNotReadyForConsistentReads = "Not ready to serve consistent reads" + errNoRegionPath = "No path to region" + errTokenNotFound = "ACL token not found" + errPermissionDenied = "Permission denied" + errNoNodeConn = "No path to node" + errUnknownMethod = "Unknown rpc method" + errUnknownNomadVersion = "Unable to determine Nomad version" + errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later" // Prefix based errors that are used to check if the error is of a given // type. These errors should be created with the associated constructor. @@ -26,14 +27,15 @@ const ( ) var ( - ErrNoLeader = errors.New(errNoLeader) - ErrNoRegionPath = errors.New(errNoRegionPath) - ErrTokenNotFound = errors.New(errTokenNotFound) - ErrPermissionDenied = errors.New(errPermissionDenied) - ErrNoNodeConn = errors.New(errNoNodeConn) - ErrUnknownMethod = errors.New(errUnknownMethod) - ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion) - ErrNodeLacksRpc = errors.New(errNodeLacksRpc) + ErrNoLeader = errors.New(errNoLeader) + ErrNotReadyForConsistentReads = errors.New(errNotReadyForConsistentReads) + ErrNoRegionPath = errors.New(errNoRegionPath) + ErrTokenNotFound = errors.New(errTokenNotFound) + ErrPermissionDenied = errors.New(errPermissionDenied) + ErrNoNodeConn = errors.New(errNoNodeConn) + ErrUnknownMethod = errors.New(errUnknownMethod) + ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion) + ErrNodeLacksRpc = errors.New(errNodeLacksRpc) ) // IsErrNoLeader returns whether the error is due to there being no leader.