mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 02:15:43 +03:00
Merge pull request #5911 from hashicorp/b-rpc-consistent-reads
Block rpc handling until state store is caught up
This commit is contained in:
@@ -283,6 +283,8 @@ func (s *Server) establishLeadership(stopCh chan struct{}) error {
|
||||
return err
|
||||
}
|
||||
|
||||
s.setConsistentReadReady()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -767,6 +769,8 @@ func (s *Server) iterateJobStatusMetrics(jobs *memdb.ResultIterator) {
|
||||
func (s *Server) revokeLeadership() error {
|
||||
defer metrics.MeasureSince([]string{"nomad", "leader", "revoke_leadership"}, time.Now())
|
||||
|
||||
s.resetConsistentReadReady()
|
||||
|
||||
// Clear the leader token since we are no longer the leader.
|
||||
s.setLeaderAcl("")
|
||||
|
||||
|
||||
@@ -1126,6 +1126,25 @@ func TestLeader_RevokeLeadership_MultipleTimes(t *testing.T) {
|
||||
require.Nil(t, s1.revokeLeadership())
|
||||
}
|
||||
|
||||
func TestLeader_TransitionsUpdateConsistencyRead(t *testing.T) {
|
||||
s1 := TestServer(t, nil)
|
||||
defer s1.Shutdown()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return s1.isReadyForConsistentReads(), nil
|
||||
}, func(err error) {
|
||||
require.Fail(t, "should have finished establish leader loop")
|
||||
})
|
||||
|
||||
require.Nil(t, s1.revokeLeadership())
|
||||
require.False(t, s1.isReadyForConsistentReads())
|
||||
|
||||
ch := make(chan struct{})
|
||||
require.Nil(t, s1.establishLeadership(ch))
|
||||
require.True(t, s1.isReadyForConsistentReads())
|
||||
}
|
||||
|
||||
// Test doing an inplace upgrade on a server from raft protocol 2 to 3
|
||||
// This verifies that removing the server and adding it back with a uuid works
|
||||
// even if the server's address stays the same.
|
||||
|
||||
@@ -432,7 +432,7 @@ CHECK_LEADER:
|
||||
isLeader, remoteServer := r.getLeader()
|
||||
|
||||
// Handle the case we are the leader
|
||||
if isLeader {
|
||||
if isLeader && r.Server.isReadyForConsistentReads() {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
@@ -457,7 +457,11 @@ CHECK_LEADER:
|
||||
}
|
||||
}
|
||||
|
||||
// No leader found and hold time exceeded
|
||||
// hold time exceeeded without being ready to respond
|
||||
if isLeader {
|
||||
return true, structs.ErrNotReadyForConsistentReads
|
||||
}
|
||||
|
||||
return true, structs.ErrNoLeader
|
||||
}
|
||||
|
||||
|
||||
@@ -77,6 +77,47 @@ func TestRPC_forwardLeader(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRPC_WaitForConsistentReads(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, func(c *Config) {
|
||||
c.RPCHoldTimeout = 20 * time.Millisecond
|
||||
})
|
||||
defer s1.Shutdown()
|
||||
testutil.WaitForLeader(t, s1.RPC)
|
||||
|
||||
isLeader, _ := s1.getLeader()
|
||||
require.True(t, isLeader)
|
||||
require.True(t, s1.isReadyForConsistentReads())
|
||||
|
||||
s1.resetConsistentReadReady()
|
||||
require.False(t, s1.isReadyForConsistentReads())
|
||||
|
||||
codec := rpcClient(t, s1)
|
||||
|
||||
get := &structs.JobListRequest{
|
||||
QueryOptions: structs.QueryOptions{
|
||||
Region: "global",
|
||||
Namespace: "default",
|
||||
},
|
||||
}
|
||||
|
||||
// check timeout while waiting for consistency
|
||||
var resp structs.JobListResponse
|
||||
err := msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp)
|
||||
require.Error(t, err)
|
||||
require.Contains(t, err.Error(), structs.ErrNotReadyForConsistentReads.Error())
|
||||
|
||||
// check we wait and block
|
||||
go func() {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
s1.setConsistentReadReady()
|
||||
}()
|
||||
|
||||
err = msgpackrpc.CallWithCodec(codec, "Job.List", get, &resp)
|
||||
require.NoError(t, err)
|
||||
|
||||
}
|
||||
|
||||
func TestRPC_forwardRegion(t *testing.T) {
|
||||
t.Parallel()
|
||||
s1 := TestServer(t, nil)
|
||||
|
||||
@@ -167,6 +167,9 @@ type Server struct {
|
||||
// join/leave from the region.
|
||||
reconcileCh chan serf.Member
|
||||
|
||||
// used to track when the server is ready to serve consistent reads, updated atomically
|
||||
readyForConsistentReads int32
|
||||
|
||||
// eventCh is used to receive events from the serf cluster
|
||||
eventCh chan serf.Event
|
||||
|
||||
@@ -1400,6 +1403,21 @@ func (s *Server) getLeaderAcl() string {
|
||||
return s.leaderAcl
|
||||
}
|
||||
|
||||
// Atomically sets a readiness state flag when leadership is obtained, to indicate that server is past its barrier write
|
||||
func (s *Server) setConsistentReadReady() {
|
||||
atomic.StoreInt32(&s.readyForConsistentReads, 1)
|
||||
}
|
||||
|
||||
// Atomically reset readiness state flag on leadership revoke
|
||||
func (s *Server) resetConsistentReadReady() {
|
||||
atomic.StoreInt32(&s.readyForConsistentReads, 0)
|
||||
}
|
||||
|
||||
// Returns true if this server is ready to serve consistent reads
|
||||
func (s *Server) isReadyForConsistentReads() bool {
|
||||
return atomic.LoadInt32(&s.readyForConsistentReads) == 1
|
||||
}
|
||||
|
||||
// Regions returns the known regions in the cluster.
|
||||
func (s *Server) Regions() []string {
|
||||
s.peerLock.RLock()
|
||||
|
||||
@@ -7,14 +7,15 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
errNoLeader = "No cluster leader"
|
||||
errNoRegionPath = "No path to region"
|
||||
errTokenNotFound = "ACL token not found"
|
||||
errPermissionDenied = "Permission denied"
|
||||
errNoNodeConn = "No path to node"
|
||||
errUnknownMethod = "Unknown rpc method"
|
||||
errUnknownNomadVersion = "Unable to determine Nomad version"
|
||||
errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later"
|
||||
errNoLeader = "No cluster leader"
|
||||
errNotReadyForConsistentReads = "Not ready to serve consistent reads"
|
||||
errNoRegionPath = "No path to region"
|
||||
errTokenNotFound = "ACL token not found"
|
||||
errPermissionDenied = "Permission denied"
|
||||
errNoNodeConn = "No path to node"
|
||||
errUnknownMethod = "Unknown rpc method"
|
||||
errUnknownNomadVersion = "Unable to determine Nomad version"
|
||||
errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later"
|
||||
|
||||
// Prefix based errors that are used to check if the error is of a given
|
||||
// type. These errors should be created with the associated constructor.
|
||||
@@ -26,14 +27,15 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
ErrNoLeader = errors.New(errNoLeader)
|
||||
ErrNoRegionPath = errors.New(errNoRegionPath)
|
||||
ErrTokenNotFound = errors.New(errTokenNotFound)
|
||||
ErrPermissionDenied = errors.New(errPermissionDenied)
|
||||
ErrNoNodeConn = errors.New(errNoNodeConn)
|
||||
ErrUnknownMethod = errors.New(errUnknownMethod)
|
||||
ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion)
|
||||
ErrNodeLacksRpc = errors.New(errNodeLacksRpc)
|
||||
ErrNoLeader = errors.New(errNoLeader)
|
||||
ErrNotReadyForConsistentReads = errors.New(errNotReadyForConsistentReads)
|
||||
ErrNoRegionPath = errors.New(errNoRegionPath)
|
||||
ErrTokenNotFound = errors.New(errTokenNotFound)
|
||||
ErrPermissionDenied = errors.New(errPermissionDenied)
|
||||
ErrNoNodeConn = errors.New(errNoNodeConn)
|
||||
ErrUnknownMethod = errors.New(errUnknownMethod)
|
||||
ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion)
|
||||
ErrNodeLacksRpc = errors.New(errNodeLacksRpc)
|
||||
)
|
||||
|
||||
// IsErrNoLeader returns whether the error is due to there being no leader.
|
||||
|
||||
Reference in New Issue
Block a user