diff --git a/nomad/client_alloc_endpoint.go b/nomad/client_alloc_endpoint.go index bb28d2d39..8495ce855 100644 --- a/nomad/client_alloc_endpoint.go +++ b/nomad/client_alloc_endpoint.go @@ -2,6 +2,7 @@ package nomad import ( "errors" + "fmt" "time" metrics "github.com/armon/go-metrics" @@ -41,16 +42,29 @@ func (a *ClientAllocations) GarbageCollectAll(args *structs.NodeSpecificRequest, return errors.New("missing NodeID") } + // Make sure Node is valid and new enough to support RPC + snap, err := a.srv.State().Snapshot() + if err != nil { + return err + } + + node, err := snap.NodeByID(nil, args.NodeID) + if err != nil { + return err + } + + if node == nil { + return fmt.Errorf("Unknown node %q", args.NodeID) + } + + if err := nodeSupportsRpc(node); err != nil { + return err + } + // Get the connection to the client state, ok := a.srv.getNodeConn(args.NodeID) if !ok { - // Check if the node even exists - snap, err := a.srv.State().Snapshot() - if err != nil { - return err - } - - return findNodeConnAndForward(a.srv, snap, args.NodeID, "ClientAllocations.GarbageCollectAll", args, reply) + return findNodeConnAndForward(a.srv, args.NodeID, "ClientAllocations.GarbageCollectAll", args, reply) } // Make the RPC @@ -97,10 +111,24 @@ func (a *ClientAllocations) GarbageCollect(args *structs.AllocSpecificRequest, r return structs.NewErrUnknownAllocation(args.AllocID) } + // Make sure Node is valid and new enough to support RPC + node, err := snap.NodeByID(nil, alloc.NodeID) + if err != nil { + return err + } + + if node == nil { + return fmt.Errorf("Unknown node %q", alloc.NodeID) + } + + if err := nodeSupportsRpc(node); err != nil { + return err + } + // Get the connection to the client state, ok := a.srv.getNodeConn(alloc.NodeID) if !ok { - return findNodeConnAndForward(a.srv, snap, alloc.NodeID, "ClientAllocations.GarbageCollect", args, reply) + return findNodeConnAndForward(a.srv, alloc.NodeID, "ClientAllocations.GarbageCollect", args, reply) } // Make the RPC @@ -147,10 +175,24 @@ func (a *ClientAllocations) Stats(args *cstructs.AllocStatsRequest, reply *cstru return structs.NewErrUnknownAllocation(args.AllocID) } + // Make sure Node is valid and new enough to support RPC + node, err := snap.NodeByID(nil, alloc.NodeID) + if err != nil { + return err + } + + if node == nil { + return fmt.Errorf("Unknown node %q", alloc.NodeID) + } + + if err := nodeSupportsRpc(node); err != nil { + return err + } + // Get the connection to the client state, ok := a.srv.getNodeConn(alloc.NodeID) if !ok { - return findNodeConnAndForward(a.srv, snap, alloc.NodeID, "ClientAllocations.Stats", args, reply) + return findNodeConnAndForward(a.srv, alloc.NodeID, "ClientAllocations.Stats", args, reply) } // Make the RPC diff --git a/nomad/client_alloc_endpoint_test.go b/nomad/client_alloc_endpoint_test.go index c905b1b79..1df7485e3 100644 --- a/nomad/client_alloc_endpoint_test.go +++ b/nomad/client_alloc_endpoint_test.go @@ -139,6 +139,39 @@ func TestClientAllocations_GarbageCollectAll_NoNode(t *testing.T) { require.Contains(err.Error(), "Unknown node") } +func TestClientAllocations_GarbageCollectAll_OldNode(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and fake an old client + s := TestServer(t, nil) + defer s.Shutdown() + state := s.State() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Test for an old version error + node := mock.Node() + node.Attributes["nomad.version"] = "0.7.1" + require.Nil(state.UpsertNode(1005, node)) + + req := &structs.NodeSpecificRequest{ + NodeID: node.ID, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollectAll", req, &resp) + require.True(structs.IsErrNodeLacksRpc(err)) + + // Test for a missing version error + delete(node.Attributes, "nomad.version") + require.Nil(state.UpsertNode(1006, node)) + + err = msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollectAll", req, &resp) + require.True(structs.IsErrUnknownNomadVersion(err)) +} + func TestClientAllocations_GarbageCollectAll_Remote(t *testing.T) { t.Parallel() require := require.New(t) @@ -185,6 +218,46 @@ func TestClientAllocations_GarbageCollectAll_Remote(t *testing.T) { require.Nil(err) } +func TestClientAllocations_GarbageCollect_OldNode(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and fake an old client + s := TestServer(t, nil) + defer s.Shutdown() + state := s.State() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Test for an old version error + node := mock.Node() + node.Attributes["nomad.version"] = "0.7.1" + require.Nil(state.UpsertNode(1005, node)) + + alloc := mock.Alloc() + alloc.NodeID = node.ID + require.Nil(state.UpsertAllocs(1006, []*structs.Allocation{alloc})) + + req := &structs.AllocSpecificRequest{ + AllocID: alloc.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + Namespace: structs.DefaultNamespace, + }, + } + + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollect", req, &resp) + require.True(structs.IsErrNodeLacksRpc(err), err.Error()) + + // Test for a missing version error + delete(node.Attributes, "nomad.version") + require.Nil(state.UpsertNode(1007, node)) + + err = msgpackrpc.CallWithCodec(codec, "ClientAllocations.GarbageCollect", req, &resp) + require.True(structs.IsErrUnknownNomadVersion(err), err.Error()) +} + func TestClientAllocations_GarbageCollect_Local(t *testing.T) { t.Parallel() require := require.New(t) @@ -417,6 +490,45 @@ func TestClientAllocations_GarbageCollect_Remote(t *testing.T) { require.Nil(err) } +func TestClientAllocations_Stats_OldNode(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and fake an old client + s := TestServer(t, nil) + defer s.Shutdown() + state := s.State() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Test for an old version error + node := mock.Node() + node.Attributes["nomad.version"] = "0.7.1" + require.Nil(state.UpsertNode(1005, node)) + + alloc := mock.Alloc() + alloc.NodeID = node.ID + require.Nil(state.UpsertAllocs(1006, []*structs.Allocation{alloc})) + + req := &structs.AllocSpecificRequest{ + AllocID: alloc.ID, + QueryOptions: structs.QueryOptions{ + Region: "global", + }, + } + + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "ClientAllocations.Stats", req, &resp) + require.True(structs.IsErrNodeLacksRpc(err), err.Error()) + + // Test for a missing version error + delete(node.Attributes, "nomad.version") + require.Nil(state.UpsertNode(1007, node)) + + err = msgpackrpc.CallWithCodec(codec, "ClientAllocations.Stats", req, &resp) + require.True(structs.IsErrUnknownNomadVersion(err), err.Error()) +} + func TestClientAllocations_Stats_Local(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/client_fs_endpoint.go b/nomad/client_fs_endpoint.go index 23cbd26b5..5c54dea98 100644 --- a/nomad/client_fs_endpoint.go +++ b/nomad/client_fs_endpoint.go @@ -2,6 +2,7 @@ package nomad import ( "errors" + "fmt" "io" "net" "strings" @@ -130,10 +131,24 @@ func (f *FileSystem) List(args *cstructs.FsListRequest, reply *cstructs.FsListRe return structs.NewErrUnknownAllocation(args.AllocID) } + // Make sure Node is valid and new enough to support RPC + node, err := snap.NodeByID(nil, alloc.NodeID) + if err != nil { + return err + } + + if node == nil { + return fmt.Errorf("Unknown node %q", alloc.NodeID) + } + + if err := nodeSupportsRpc(node); err != nil { + return err + } + // Get the connection to the client state, ok := f.srv.getNodeConn(alloc.NodeID) if !ok { - return findNodeConnAndForward(f.srv, snap, alloc.NodeID, "FileSystem.List", args, reply) + return findNodeConnAndForward(f.srv, alloc.NodeID, "FileSystem.List", args, reply) } // Make the RPC @@ -179,10 +194,24 @@ func (f *FileSystem) Stat(args *cstructs.FsStatRequest, reply *cstructs.FsStatRe return structs.NewErrUnknownAllocation(args.AllocID) } + // Make sure Node is valid and new enough to support RPC + node, err := snap.NodeByID(nil, alloc.NodeID) + if err != nil { + return err + } + + if node == nil { + return fmt.Errorf("Unknown node %q", alloc.NodeID) + } + + if err := nodeSupportsRpc(node); err != nil { + return err + } + // Get the connection to the client state, ok := f.srv.getNodeConn(alloc.NodeID) if !ok { - return findNodeConnAndForward(f.srv, snap, alloc.NodeID, "FileSystem.Stat", args, reply) + return findNodeConnAndForward(f.srv, alloc.NodeID, "FileSystem.Stat", args, reply) } // Make the RPC @@ -245,6 +274,24 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { } nodeID := alloc.NodeID + // Make sure Node is valid and new enough to support RPC + node, err := snap.NodeByID(nil, nodeID) + if err != nil { + f.handleStreamResultError(err, helper.Int64ToPtr(500), encoder) + return + } + + if node == nil { + err := fmt.Errorf("Unknown node %q", nodeID) + f.handleStreamResultError(err, helper.Int64ToPtr(400), encoder) + return + } + + if err := nodeSupportsRpc(node); err != nil { + f.handleStreamResultError(err, helper.Int64ToPtr(400), encoder) + return + } + // Get the connection to the client either by forwarding to another server // or creating a direct stream var clientConn net.Conn @@ -348,6 +395,24 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { } nodeID := alloc.NodeID + // Make sure Node is valid and new enough to support RPC + node, err := snap.NodeByID(nil, nodeID) + if err != nil { + f.handleStreamResultError(err, helper.Int64ToPtr(500), encoder) + return + } + + if node == nil { + err := fmt.Errorf("Unknown node %q", nodeID) + f.handleStreamResultError(err, helper.Int64ToPtr(400), encoder) + return + } + + if err := nodeSupportsRpc(node); err != nil { + f.handleStreamResultError(err, helper.Int64ToPtr(400), encoder) + return + } + // Get the connection to the client either by forwarding to another server // or creating a direct stream var clientConn net.Conn diff --git a/nomad/client_fs_endpoint_test.go b/nomad/client_fs_endpoint_test.go index c396f833f..bdfdd6409 100644 --- a/nomad/client_fs_endpoint_test.go +++ b/nomad/client_fs_endpoint_test.go @@ -259,6 +259,37 @@ func TestClientFS_List_Remote(t *testing.T) { require.NotEmpty(resp.Files) } +func TestClientFS_Stat_OldNode(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server + s := TestServer(t, nil) + defer s.Shutdown() + state := s.State() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Test for an old version error + node := mock.Node() + node.Attributes["nomad.version"] = "0.7.1" + require.Nil(state.UpsertNode(1005, node)) + + alloc := mock.Alloc() + alloc.NodeID = node.ID + require.Nil(state.UpsertAllocs(1006, []*structs.Allocation{alloc})) + + req := &cstructs.FsStatRequest{ + AllocID: alloc.ID, + Path: "/", + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + var resp cstructs.FsStatResponse + err := msgpackrpc.CallWithCodec(codec, "FileSystem.Stat", req, &resp) + require.True(structs.IsErrNodeLacksRpc(err), err.Error()) +} + func TestClientFS_Stat_Local(t *testing.T) { t.Parallel() require := require.New(t) @@ -1310,6 +1341,86 @@ OUTER: } } +func TestClientFS_Logs_OldNode(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server + s := TestServer(t, nil) + defer s.Shutdown() + state := s.State() + testutil.WaitForLeader(t, s.RPC) + + // Test for an old version error + node := mock.Node() + node.Attributes["nomad.version"] = "0.7.1" + require.Nil(state.UpsertNode(1005, node)) + + alloc := mock.Alloc() + alloc.NodeID = node.ID + require.Nil(state.UpsertAllocs(1006, []*structs.Allocation{alloc})) + + req := &cstructs.FsLogsRequest{ + AllocID: alloc.ID, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Get the handler + handler, err := s.StreamingRpcHandler("FileSystem.Logs") + require.Nil(err) + + // Create a pipe + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + errCh := make(chan error) + streamMsg := make(chan *cstructs.StreamErrWrapper) + + // Start the handler + go handler(p2) + + // Start the decoder + go func() { + decoder := codec.NewDecoder(p1, structs.MsgpackHandle) + for { + var msg cstructs.StreamErrWrapper + if err := decoder.Decode(&msg); err != nil { + if err == io.EOF || strings.Contains(err.Error(), "closed") { + return + } + errCh <- fmt.Errorf("error decoding: %v", err) + } + + streamMsg <- &msg + } + }() + + // Send the request + encoder := codec.NewEncoder(p1, structs.MsgpackHandle) + require.Nil(encoder.Encode(req)) + + timeout := time.After(5 * time.Second) + +OUTER: + for { + select { + case <-timeout: + t.Fatal("timeout") + case err := <-errCh: + t.Fatal(err) + case msg := <-streamMsg: + if msg.Error == nil { + continue + } + + if structs.IsErrNodeLacksRpc(msg.Error) { + break OUTER + } + } + } +} + func TestClientFS_Logs_ACL(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/client_rpc.go b/nomad/client_rpc.go index 2b8776253..af59679b0 100644 --- a/nomad/client_rpc.go +++ b/nomad/client_rpc.go @@ -9,7 +9,6 @@ import ( multierror "github.com/hashicorp/go-multierror" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/helper/pool" - "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/yamux" "github.com/ugorji/go/codec" @@ -237,18 +236,7 @@ func NodeStreamingRpc(session *yamux.Session, method string) (net.Conn, error) { // findNodeConnAndForward is a helper for finding the server with a connection // to the given node and forwarding the RPC to the correct server. This does not // work for streaming RPCs. -func findNodeConnAndForward(srv *Server, snap *state.StateSnapshot, - nodeID, method string, args, reply interface{}) error { - - node, err := snap.NodeByID(nil, nodeID) - if err != nil { - return err - } - - if node == nil { - return fmt.Errorf("Unknown node %q", nodeID) - } - +func findNodeConnAndForward(srv *Server, nodeID, method string, args, reply interface{}) error { // Determine the Server that has a connection to the node. srvWithConn, err := srv.serverWithNodeConn(nodeID, srv.Region()) if err != nil { diff --git a/nomad/client_stats_endpoint.go b/nomad/client_stats_endpoint.go index 7042bbf2b..3755a6277 100644 --- a/nomad/client_stats_endpoint.go +++ b/nomad/client_stats_endpoint.go @@ -40,23 +40,29 @@ func (s *ClientStats) Stats(args *nstructs.NodeSpecificRequest, reply *structs.C return errors.New("missing NodeID") } + // Check if the node even exists and is compatible with NodeRpc + snap, err := s.srv.State().Snapshot() + if err != nil { + return err + } + + node, err := snap.NodeByID(nil, args.NodeID) + if err != nil { + return err + } + + if node == nil { + return fmt.Errorf("Unknown node %q", args.NodeID) + } + + // Make sure Node is new enough to support RPC + if err := nodeSupportsRpc(node); err != nil { + return err + } + // Get the connection to the client state, ok := s.srv.getNodeConn(args.NodeID) if !ok { - // Check if the node even exists - snap, err := s.srv.State().Snapshot() - if err != nil { - return err - } - - node, err := snap.NodeByID(nil, args.NodeID) - if err != nil { - return err - } - - if node == nil { - return fmt.Errorf("Unknown node %q", args.NodeID) - } // Determine the Server that has a connection to the node. srv, err := s.srv.serverWithNodeConn(args.NodeID, s.srv.Region()) diff --git a/nomad/client_stats_endpoint_test.go b/nomad/client_stats_endpoint_test.go index 232be4cfb..66e42b059 100644 --- a/nomad/client_stats_endpoint_test.go +++ b/nomad/client_stats_endpoint_test.go @@ -126,7 +126,7 @@ func TestClientStats_Stats_NoNode(t *testing.T) { codec := rpcClient(t, s) testutil.WaitForLeader(t, s.RPC) - // Make the request without having a node-id + // Make the request with a nonexistent node-id req := &structs.NodeSpecificRequest{ NodeID: uuid.Generate(), QueryOptions: structs.QueryOptions{Region: "global"}, @@ -140,6 +140,33 @@ func TestClientStats_Stats_NoNode(t *testing.T) { require.Contains(err.Error(), "Unknown node") } +func TestClientStats_Stats_OldNode(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server + s := TestServer(t, nil) + defer s.Shutdown() + state := s.State() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + // Test for an old version error + node := mock.Node() + node.Attributes["nomad.version"] = "0.7.1" + require.Nil(state.UpsertNode(1005, node)) + + req := &structs.NodeSpecificRequest{ + NodeID: node.ID, + QueryOptions: structs.QueryOptions{Region: "global"}, + } + + // Fetch the response + var resp cstructs.ClientStatsResponse + err := msgpackrpc.CallWithCodec(codec, "ClientStats.Stats", req, &resp) + require.True(structs.IsErrNodeLacksRpc(err), err.Error()) +} + func TestClientStats_Stats_Remote(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/structs/errors.go b/nomad/structs/errors.go index f1139cbd7..f16461250 100644 --- a/nomad/structs/errors.go +++ b/nomad/structs/errors.go @@ -7,12 +7,14 @@ import ( ) const ( - errNoLeader = "No cluster leader" - errNoRegionPath = "No path to region" - errTokenNotFound = "ACL token not found" - errPermissionDenied = "Permission denied" - errNoNodeConn = "No path to node" - errUnknownMethod = "Unknown rpc method" + errNoLeader = "No cluster leader" + errNoRegionPath = "No path to region" + errTokenNotFound = "ACL token not found" + errPermissionDenied = "Permission denied" + errNoNodeConn = "No path to node" + errUnknownMethod = "Unknown rpc method" + errUnknownNomadVersion = "Unable to determine Nomad version" + errNodeLacksRpc = "Node does not support RPC; requires 0.8 or later" // Prefix based errors that are used to check if the error is of a given // type. These errors should be created with the associated constructor. @@ -24,12 +26,14 @@ const ( ) var ( - ErrNoLeader = errors.New(errNoLeader) - ErrNoRegionPath = errors.New(errNoRegionPath) - ErrTokenNotFound = errors.New(errTokenNotFound) - ErrPermissionDenied = errors.New(errPermissionDenied) - ErrNoNodeConn = errors.New(errNoNodeConn) - ErrUnknownMethod = errors.New(errUnknownMethod) + ErrNoLeader = errors.New(errNoLeader) + ErrNoRegionPath = errors.New(errNoRegionPath) + ErrTokenNotFound = errors.New(errTokenNotFound) + ErrPermissionDenied = errors.New(errPermissionDenied) + ErrNoNodeConn = errors.New(errNoNodeConn) + ErrUnknownMethod = errors.New(errUnknownMethod) + ErrUnknownNomadVersion = errors.New(errUnknownNomadVersion) + ErrNodeLacksRpc = errors.New(errNodeLacksRpc) ) // IsErrNoLeader returns whether the error is due to there being no leader. @@ -124,3 +128,15 @@ func IsErrUnknownEvaluation(err error) bool { func IsErrUnknownDeployment(err error) bool { return err != nil && strings.Contains(err.Error(), ErrUnknownDeploymentPrefix) } + +// IsErrUnknownNomadVersion returns whether the error is due to Nomad being +// unable to determine the version of a node. +func IsErrUnknownNomadVersion(err error) bool { + return err != nil && strings.Contains(err.Error(), errUnknownNomadVersion) +} + +// IsErrNodeLacksRpc returns whether error is due to a Nomad server being +// unable to connect to a client node because the client is too old (pre-v0.8). +func IsErrNodeLacksRpc(err error) bool { + return err != nil && strings.Contains(err.Error(), errNodeLacksRpc) +} diff --git a/nomad/util.go b/nomad/util.go index 48c85050f..257b023ae 100644 --- a/nomad/util.go +++ b/nomad/util.go @@ -9,6 +9,7 @@ import ( "strconv" version "github.com/hashicorp/go-version" + "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/serf/serf" ) @@ -201,3 +202,24 @@ func maxUint64(inputs ...uint64) uint64 { } return max } + +var minRpcVersion = version.Must(version.NewVersion("0.8.0-rc1")) + +// nodeSupportsRpc returns a non-nil error if a Node does not support RPC. +func nodeSupportsRpc(node *structs.Node) error { + rawNodeVer, ok := node.Attributes["nomad.version"] + if !ok { + return structs.ErrUnknownNomadVersion + } + + nodeVer, err := version.NewVersion(rawNodeVer) + if err != nil { + return structs.ErrUnknownNomadVersion + } + + if nodeVer.LessThan(minRpcVersion) { + return structs.ErrNodeLacksRpc + } + + return nil +}