diff --git a/nomad/rpc.go b/nomad/rpc.go index 28ad38fd2..d888a4f59 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -4,6 +4,7 @@ import ( "crypto/tls" "fmt" "io" + "math/rand" "net" "strings" "time" @@ -34,6 +35,18 @@ const ( ) const ( + // maxQueryTime is used to bound the limit of a blocking query + maxQueryTime = 300 * time.Second + + // defaultQueryTime is the amount of time we block waiting for a change + // if no time is specified. Previously we would wait the maxQueryTime. + defaultQueryTime = 300 * time.Second + + // jitterFraction is a the limit to the amount of jitter we apply + // to a user specified MaxQueryTime. We divide the specified time by + // the fraction. So 16 == 6.25% limit of jitter + jitterFraction = 16 + // Warn if the Raft command is larger than this. // If it's over 1MB something is probably being abusive. raftWarnSize = 1024 * 1024 @@ -152,6 +165,71 @@ func (s *Server) handleNomadConn(conn net.Conn) { } } +// forward is used to forward to a remote region or to forward to the local leader +// Returns a bool of if forwarding was performed, as well as any error +func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) { + // Handle region forwarding + region := info.RequestRegion() + if region != s.config.Region { + err := s.forwardRegion(method, region, args, reply) + return true, err + } + + // Check if we can allow a stale read + if info.IsRead() && info.AllowStaleRead() { + return false, nil + } + + // Handle leader forwarding + if !s.IsLeader() { + err := s.forwardLeader(method, args, reply) + return true, err + } + return false, nil +} + +// forwardLeader is used to forward an RPC call to the leader, or fail if no leader +func (s *Server) forwardLeader(method string, args interface{}, reply interface{}) error { + // Get the leader + leader := s.raft.Leader() + if leader == "" { + return structs.ErrNoLeader + } + + // Lookup the server + s.peerLock.RLock() + server := s.localPeers[leader] + s.peerLock.RUnlock() + + // Handle a missing server + if server == nil { + return structs.ErrNoLeader + } + return s.connPool.RPC(s.config.Region, server.Addr, server.Version, method, args, reply) +} + +// forwardRegion is used to forward an RPC call to a remote region, or fail if no servers +func (s *Server) forwardRegion(method, region string, args interface{}, reply interface{}) error { + // Bail if we can't find any servers + s.peerLock.RLock() + servers := s.peers[region] + if len(servers) == 0 { + s.peerLock.RUnlock() + s.logger.Printf("[WARN] nomad.rpc: RPC request for region '%s', no path found", + region) + return structs.ErrNoRegionPath + } + + // Select a random addr + offset := rand.Int31() % int32(len(servers)) + server := servers[offset] + s.peerLock.RUnlock() + + // Forward to remote Nomad + metrics.IncrCounter([]string{"nomad", "rpc", "cross-region", region}, 1) + return s.connPool.RPC(region, server.Addr, server.Version, method, args, reply) +} + // raftApply is used to encode a message, run it through raft, and return // the FSM response along with any errors func (s *Server) raftApply(t structs.MessageType, msg interface{}) (interface{}, error) {