rpc: only attempt NodeRpc for nodes>=0.8

Attempting NodeRpc (or streaming node rpc) for clients that do not
support it causes it to hang indefinitely because while the TCP
connection exists, the client will never respond.
This commit is contained in:
Michael Schurter
2018-04-06 11:33:08 -07:00
parent c79a9166db
commit 2f479e7b15
9 changed files with 440 additions and 51 deletions

View File

@@ -2,6 +2,7 @@ package nomad
import (
"errors"
"fmt"
"time"
metrics "github.com/armon/go-metrics"
@@ -41,16 +42,29 @@ func (a *ClientAllocations) GarbageCollectAll(args *structs.NodeSpecificRequest,
return errors.New("missing NodeID")
}
// Make sure Node is valid and new enough to support RPC
snap, err := a.srv.State().Snapshot()
if err != nil {
return err
}
node, err := snap.NodeByID(nil, args.NodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("Unknown node %q", args.NodeID)
}
if err := nodeSupportsRpc(node); err != nil {
return err
}
// Get the connection to the client
state, ok := a.srv.getNodeConn(args.NodeID)
if !ok {
// Check if the node even exists
snap, err := a.srv.State().Snapshot()
if err != nil {
return err
}
return findNodeConnAndForward(a.srv, snap, args.NodeID, "ClientAllocations.GarbageCollectAll", args, reply)
return findNodeConnAndForward(a.srv, args.NodeID, "ClientAllocations.GarbageCollectAll", args, reply)
}
// Make the RPC
@@ -97,10 +111,24 @@ func (a *ClientAllocations) GarbageCollect(args *structs.AllocSpecificRequest, r
return structs.NewErrUnknownAllocation(args.AllocID)
}
// Make sure Node is valid and new enough to support RPC
node, err := snap.NodeByID(nil, alloc.NodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("Unknown node %q", alloc.NodeID)
}
if err := nodeSupportsRpc(node); err != nil {
return err
}
// Get the connection to the client
state, ok := a.srv.getNodeConn(alloc.NodeID)
if !ok {
return findNodeConnAndForward(a.srv, snap, alloc.NodeID, "ClientAllocations.GarbageCollect", args, reply)
return findNodeConnAndForward(a.srv, alloc.NodeID, "ClientAllocations.GarbageCollect", args, reply)
}
// Make the RPC
@@ -147,10 +175,24 @@ func (a *ClientAllocations) Stats(args *cstructs.AllocStatsRequest, reply *cstru
return structs.NewErrUnknownAllocation(args.AllocID)
}
// Make sure Node is valid and new enough to support RPC
node, err := snap.NodeByID(nil, alloc.NodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("Unknown node %q", alloc.NodeID)
}
if err := nodeSupportsRpc(node); err != nil {
return err
}
// Get the connection to the client
state, ok := a.srv.getNodeConn(alloc.NodeID)
if !ok {
return findNodeConnAndForward(a.srv, snap, alloc.NodeID, "ClientAllocations.Stats", args, reply)
return findNodeConnAndForward(a.srv, alloc.NodeID, "ClientAllocations.Stats", args, reply)
}
// Make the RPC

View File

@@ -139,6 +139,39 @@ func TestClientAllocations_GarbageCollectAll_NoNode(t *testing.T) {
require.Contains(err.Error(), "Unknown node")
}
func TestClientAllocations_GarbageCollectAll_OldNode(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and fake an old client
s := TestServer(t, nil)
defer s.Shutdown()
state := s.State()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Test for an old version error
node := mock.Node()
node.Attributes["nomad.version"] = "0.7.1"
require.Nil(state.UpsertNode(1005, node))
req := &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollectAll", req, &resp)
require.True(structs.IsErrNodeLacksRpc(err))
// Test for a missing version error
delete(node.Attributes, "nomad.version")
require.Nil(state.UpsertNode(1006, node))
err = msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollectAll", req, &resp)
require.True(structs.IsErrUnknownNomadVersion(err))
}
func TestClientAllocations_GarbageCollectAll_Remote(t *testing.T) {
t.Parallel()
require := require.New(t)
@@ -185,6 +218,46 @@ func TestClientAllocations_GarbageCollectAll_Remote(t *testing.T) {
require.Nil(err)
}
func TestClientAllocations_GarbageCollect_OldNode(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and fake an old client
s := TestServer(t, nil)
defer s.Shutdown()
state := s.State()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Test for an old version error
node := mock.Node()
node.Attributes["nomad.version"] = "0.7.1"
require.Nil(state.UpsertNode(1005, node))
alloc := mock.Alloc()
alloc.NodeID = node.ID
require.Nil(state.UpsertAllocs(1006, []*structs.Allocation{alloc}))
req := &structs.AllocSpecificRequest{
AllocID: alloc.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
Namespace: structs.DefaultNamespace,
},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollect", req, &resp)
require.True(structs.IsErrNodeLacksRpc(err), err.Error())
// Test for a missing version error
delete(node.Attributes, "nomad.version")
require.Nil(state.UpsertNode(1007, node))
err = msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollect", req, &resp)
require.True(structs.IsErrUnknownNomadVersion(err), err.Error())
}
func TestClientAllocations_GarbageCollect_Local(t *testing.T) {
t.Parallel()
require := require.New(t)
@@ -417,6 +490,45 @@ func TestClientAllocations_GarbageCollect_Remote(t *testing.T) {
require.Nil(err)
}
func TestClientAllocations_Stats_OldNode(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and fake an old client
s := TestServer(t, nil)
defer s.Shutdown()
state := s.State()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Test for an old version error
node := mock.Node()
node.Attributes["nomad.version"] = "0.7.1"
require.Nil(state.UpsertNode(1005, node))
alloc := mock.Alloc()
alloc.NodeID = node.ID
require.Nil(state.UpsertAllocs(1006, []*structs.Allocation{alloc}))
req := &structs.AllocSpecificRequest{
AllocID: alloc.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.Stats", req, &resp)
require.True(structs.IsErrNodeLacksRpc(err), err.Error())
// Test for a missing version error
delete(node.Attributes, "nomad.version")
require.Nil(state.UpsertNode(1007, node))
err = msgpackrpc.CallWithCodec(codec, "ClientAllocations.Stats", req, &resp)
require.True(structs.IsErrUnknownNomadVersion(err), err.Error())
}
func TestClientAllocations_Stats_Local(t *testing.T) {
t.Parallel()
require := require.New(t)

View File

@@ -2,6 +2,7 @@ package nomad
import (
"errors"
"fmt"
"io"
"net"
"strings"
@@ -130,10 +131,24 @@ func (f *FileSystem) List(args *cstructs.FsListRequest, reply *cstructs.FsListRe
return structs.NewErrUnknownAllocation(args.AllocID)
}
// Make sure Node is valid and new enough to support RPC
node, err := snap.NodeByID(nil, alloc.NodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("Unknown node %q", alloc.NodeID)
}
if err := nodeSupportsRpc(node); err != nil {
return err
}
// Get the connection to the client
state, ok := f.srv.getNodeConn(alloc.NodeID)
if !ok {
return findNodeConnAndForward(f.srv, snap, alloc.NodeID, "FileSystem.List", args, reply)
return findNodeConnAndForward(f.srv, alloc.NodeID, "FileSystem.List", args, reply)
}
// Make the RPC
@@ -179,10 +194,24 @@ func (f *FileSystem) Stat(args *cstructs.FsStatRequest, reply *cstructs.FsStatRe
return structs.NewErrUnknownAllocation(args.AllocID)
}
// Make sure Node is valid and new enough to support RPC
node, err := snap.NodeByID(nil, alloc.NodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("Unknown node %q", alloc.NodeID)
}
if err := nodeSupportsRpc(node); err != nil {
return err
}
// Get the connection to the client
state, ok := f.srv.getNodeConn(alloc.NodeID)
if !ok {
return findNodeConnAndForward(f.srv, snap, alloc.NodeID, "FileSystem.Stat", args, reply)
return findNodeConnAndForward(f.srv, alloc.NodeID, "FileSystem.Stat", args, reply)
}
// Make the RPC
@@ -245,6 +274,24 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
}
nodeID := alloc.NodeID
// Make sure Node is valid and new enough to support RPC
node, err := snap.NodeByID(nil, nodeID)
if err != nil {
f.handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
if node == nil {
err := fmt.Errorf("Unknown node %q", nodeID)
f.handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
if err := nodeSupportsRpc(node); err != nil {
f.handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
// Get the connection to the client either by forwarding to another server
// or creating a direct stream
var clientConn net.Conn
@@ -348,6 +395,24 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
}
nodeID := alloc.NodeID
// Make sure Node is valid and new enough to support RPC
node, err := snap.NodeByID(nil, nodeID)
if err != nil {
f.handleStreamResultError(err, helper.Int64ToPtr(500), encoder)
return
}
if node == nil {
err := fmt.Errorf("Unknown node %q", nodeID)
f.handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
if err := nodeSupportsRpc(node); err != nil {
f.handleStreamResultError(err, helper.Int64ToPtr(400), encoder)
return
}
// Get the connection to the client either by forwarding to another server
// or creating a direct stream
var clientConn net.Conn

View File

@@ -259,6 +259,37 @@ func TestClientFS_List_Remote(t *testing.T) {
require.NotEmpty(resp.Files)
}
func TestClientFS_Stat_OldNode(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server
s := TestServer(t, nil)
defer s.Shutdown()
state := s.State()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Test for an old version error
node := mock.Node()
node.Attributes["nomad.version"] = "0.7.1"
require.Nil(state.UpsertNode(1005, node))
alloc := mock.Alloc()
alloc.NodeID = node.ID
require.Nil(state.UpsertAllocs(1006, []*structs.Allocation{alloc}))
req := &cstructs.FsStatRequest{
AllocID: alloc.ID,
Path: "/",
QueryOptions: structs.QueryOptions{Region: "global"},
}
var resp cstructs.FsStatResponse
err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp)
require.True(structs.IsErrNodeLacksRpc(err), err.Error())
}
func TestClientFS_Stat_Local(t *testing.T) {
t.Parallel()
require := require.New(t)
@@ -1310,6 +1341,86 @@ OUTER:
}
}
func TestClientFS_Logs_OldNode(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server
s := TestServer(t, nil)
defer s.Shutdown()
state := s.State()
testutil.WaitForLeader(t, s.RPC)
// Test for an old version error
node := mock.Node()
node.Attributes["nomad.version"] = "0.7.1"
require.Nil(state.UpsertNode(1005, node))
alloc := mock.Alloc()
alloc.NodeID = node.ID
require.Nil(state.UpsertAllocs(1006, []*structs.Allocation{alloc}))
req := &cstructs.FsLogsRequest{
AllocID: alloc.ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Get the handler
handler, err := s.StreamingRpcHandler("FileSystem.Logs")
require.Nil(err)
// Create a pipe
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
errCh := make(chan error)
streamMsg := make(chan *cstructs.StreamErrWrapper)
// Start the handler
go handler(p2)
// Start the decoder
go func() {
decoder := codec.NewDecoder(p1, structs.MsgpackHandle)
for {
var msg cstructs.StreamErrWrapper
if err := decoder.Decode(&msg); err != nil {
if err == io.EOF || strings.Contains(err.Error(), "closed") {
return
}
errCh <- fmt.Errorf("error decoding: %v", err)
}
streamMsg <- &msg
}
}()
// Send the request
encoder := codec.NewEncoder(p1, structs.MsgpackHandle)
require.Nil(encoder.Encode(req))
timeout := time.After(5 * time.Second)
OUTER:
for {
select {
case <-timeout:
t.Fatal("timeout")
case err := <-errCh:
t.Fatal(err)
case msg := <-streamMsg:
if msg.Error == nil {
continue
}
if structs.IsErrNodeLacksRpc(msg.Error) {
break OUTER
}
}
}
}
func TestClientFS_Logs_ACL(t *testing.T) {
t.Parallel()
require := require.New(t)

View File

@@ -9,7 +9,6 @@ import (
multierror "github.com/hashicorp/go-multierror"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/yamux"
"github.com/ugorji/go/codec"
@@ -237,18 +236,7 @@ func NodeStreamingRpc(session *yamux.Session, method string) (net.Conn, error) {
// findNodeConnAndForward is a helper for finding the server with a connection
// to the given node and forwarding the RPC to the correct server. This does not
// work for streaming RPCs.
func findNodeConnAndForward(srv *Server, snap *state.StateSnapshot,
nodeID, method string, args, reply interface{}) error {
node, err := snap.NodeByID(nil, nodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("Unknown node %q", nodeID)
}
func findNodeConnAndForward(srv *Server, nodeID, method string, args, reply interface{}) error {
// Determine the Server that has a connection to the node.
srvWithConn, err := srv.serverWithNodeConn(nodeID, srv.Region())
if err != nil {

View File

@@ -40,23 +40,29 @@ func (s *ClientStats) Stats(args *nstructs.NodeSpecificRequest, reply *structs.C
return errors.New("missing NodeID")
}
// Check if the node even exists and is compatible with NodeRpc
snap, err := s.srv.State().Snapshot()
if err != nil {
return err
}
node, err := snap.NodeByID(nil, args.NodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("Unknown node %q", args.NodeID)
}
// Make sure Node is new enough to support RPC
if err := nodeSupportsRpc(node); err != nil {
return err
}
// Get the connection to the client
state, ok := s.srv.getNodeConn(args.NodeID)
if !ok {
// Check if the node even exists
snap, err := s.srv.State().Snapshot()
if err != nil {
return err
}
node, err := snap.NodeByID(nil, args.NodeID)
if err != nil {
return err
}
if node == nil {
return fmt.Errorf("Unknown node %q", args.NodeID)
}
// Determine the Server that has a connection to the node.
srv, err := s.srv.serverWithNodeConn(args.NodeID, s.srv.Region())

View File

@@ -126,7 +126,7 @@ func TestClientStats_Stats_NoNode(t *testing.T) {
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Make the request without having a node-id
// Make the request with a nonexistent node-id
req := &structs.NodeSpecificRequest{
NodeID: uuid.Generate(),
QueryOptions: structs.QueryOptions{Region: "global"},
@@ -140,6 +140,33 @@ func TestClientStats_Stats_NoNode(t *testing.T) {
require.Contains(err.Error(), "Unknown node")
}
func TestClientStats_Stats_OldNode(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server
s := TestServer(t, nil)
defer s.Shutdown()
state := s.State()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
// Test for an old version error
node := mock.Node()
node.Attributes["nomad.version"] = "0.7.1"
require.Nil(state.UpsertNode(1005, node))
req := &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{Region: "global"},
}
// Fetch the response
var resp cstructs.ClientStatsResponse
err := msgpackrpc.CallWithCodec(codec, "ClientStats.Stats", req, &resp)
require.True(structs.IsErrNodeLacksRpc(err), err.Error())
}
func TestClientStats_Stats_Remote(t *testing.T) {
t.Parallel()
require := require.New(t)

View File

@@ -7,12 +7,14 @@ import (
)
const (
errNoLeader = "No cluster leader"
errNoRegionPath = "No path to region"
errTokenNotFound = "ACL token not found"
errPermissionDenied = "Permission denied"
errNoNodeConn = "No path to node"
errUnknownMethod = "Unknown rpc method"
errNoLeader = "No cluster leader"
errNoRegionPath = "No path to region"
errTokenNotFound = "ACL token not found"
errPermissionDenied = "Permission denied"
errNoNodeConn = "No path to node"
errUnknownMethod = "Unknown rpc method"
errUnknownNomadVersion = "Unable to determine Nomad version"
errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later"
// Prefix based errors that are used to check if the error is of a given
// type. These errors should be created with the associated constructor.
@@ -24,12 +26,14 @@ const (
)
var (
ErrNoLeader = errors.New(errNoLeader)
ErrNoRegionPath = errors.New(errNoRegionPath)
ErrTokenNotFound = errors.New(errTokenNotFound)
ErrPermissionDenied = errors.New(errPermissionDenied)
ErrNoNodeConn = errors.New(errNoNodeConn)
ErrUnknownMethod = errors.New(errUnknownMethod)
ErrNoLeader = errors.New(errNoLeader)
ErrNoRegionPath = errors.New(errNoRegionPath)
ErrTokenNotFound = errors.New(errTokenNotFound)
ErrPermissionDenied = errors.New(errPermissionDenied)
ErrNoNodeConn = errors.New(errNoNodeConn)
ErrUnknownMethod = errors.New(errUnknownMethod)
ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion)
ErrNodeLacksRpc = errors.New(errNodeLacksRpc)
)
// IsErrNoLeader returns whether the error is due to there being no leader.
@@ -124,3 +128,15 @@ func IsErrUnknownEvaluation(err error) bool {
func IsErrUnknownDeployment(err error) bool {
return err != nil && strings.Contains(err.Error(), ErrUnknownDeploymentPrefix)
}
// IsErrUnknownNomadVersion returns whether the error is due to Nomad being
// unable to determine the version of a node.
func IsErrUnknownNomadVersion(err error) bool {
return err != nil && strings.Contains(err.Error(), errUnknownNomadVersion)
}
// IsErrNodeLacksRpc returns whether error is due to a Nomad server being
// unable to connect to a client node because the client is too old (pre-v0.8).
func IsErrNodeLacksRpc(err error) bool {
return err != nil && strings.Contains(err.Error(), errNodeLacksRpc)
}

View File

@@ -9,6 +9,7 @@ import (
"strconv"
version "github.com/hashicorp/go-version"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/serf/serf"
)
@@ -201,3 +202,24 @@ func maxUint64(inputs ...uint64) uint64 {
}
return max
}
var minRpcVersion = version.Must(version.NewVersion("0.8.0-rc1"))
// nodeSupportsRpc returns a non-nil error if a Node does not support RPC.
func nodeSupportsRpc(node *structs.Node) error {
rawNodeVer, ok := node.Attributes["nomad.version"]
if !ok {
return structs.ErrUnknownNomadVersion
}
nodeVer, err := version.NewVersion(rawNodeVer)
if err != nil {
return structs.ErrUnknownNomadVersion
}
if nodeVer.LessThan(minRpcVersion) {
return structs.ErrNodeLacksRpc
}
return nil
}