From e05cd42063f1cc8c3e3f2ecb38dfc9e8ace7f218 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 8 Feb 2018 15:00:22 -0800 Subject: [PATCH] Use helper for forwarding --- command/agent/helpers.go | 2 ++ nomad/client_alloc_endpoint.go | 65 ++-------------------------------- nomad/client_fs_endpoint.go | 33 ++--------------- nomad/client_rpc.go | 29 +++++++++++++++ 4 files changed, 36 insertions(+), 93 deletions(-) diff --git a/command/agent/helpers.go b/command/agent/helpers.go index ec1716a3b..50542416c 100644 --- a/command/agent/helpers.go +++ b/command/agent/helpers.go @@ -10,6 +10,8 @@ func (s *HTTPServer) rpcHandlerForAlloc(allocID string) (localClient, remoteClie // See if the local client can handle the request. localAlloc := false if c != nil { + // If there is an error it means that the client doesn't have the + // allocation so we can't use the local client _, err := c.GetClientAlloc(allocID) if err == nil { localAlloc = true diff --git a/nomad/client_alloc_endpoint.go b/nomad/client_alloc_endpoint.go index 4c05e9733..e174811df 100644 --- a/nomad/client_alloc_endpoint.go +++ b/nomad/client_alloc_endpoint.go @@ -51,26 +51,7 @@ func (a *ClientAllocations) GarbageCollectAll(args *structs.NodeSpecificRequest, return err } - node, err := snap.NodeByID(nil, args.NodeID) - if err != nil { - return err - } - - if node == nil { - return fmt.Errorf("Unknown node %q", args.NodeID) - } - - // Determine the Server that has a connection to the node. - srv, err := a.srv.serverWithNodeConn(args.NodeID, a.srv.Region()) - if err != nil { - return err - } - - if srv == nil { - return structs.ErrNoNodeConn - } - - return a.srv.forwardServer(srv, "ClientAllocations.GarbageCollectAll", args, reply) + return findNodeConnAndForward(a.srv, snap, args.NodeID, "ClientAllocations.GarbageCollectAll", args, reply) } // Make the RPC @@ -120,27 +101,7 @@ func (a *ClientAllocations) GarbageCollect(args *structs.AllocSpecificRequest, r // Get the connection to the client state, ok := a.srv.getNodeConn(alloc.NodeID) if !ok { - // Check if the node even exists - node, err := snap.NodeByID(nil, alloc.NodeID) - if err != nil { - return err - } - - if node == nil { - return fmt.Errorf("Unknown node %q", alloc.NodeID) - } - - // Determine the Server that has a connection to the node. - srv, err := a.srv.serverWithNodeConn(alloc.NodeID, a.srv.Region()) - if err != nil { - return err - } - - if srv == nil { - return structs.ErrNoNodeConn - } - - return a.srv.forwardServer(srv, "ClientAllocations.GarbageCollect", args, reply) + return findNodeConnAndForward(a.srv, snap, alloc.NodeID, "ClientAllocations.GarbageCollect", args, reply) } // Make the RPC @@ -190,27 +151,7 @@ func (a *ClientAllocations) Stats(args *cstructs.AllocStatsRequest, reply *cstru // Get the connection to the client state, ok := a.srv.getNodeConn(alloc.NodeID) if !ok { - // Check if the node even exists - node, err := snap.NodeByID(nil, alloc.NodeID) - if err != nil { - return err - } - - if node == nil { - return fmt.Errorf("Unknown node %q", alloc.NodeID) - } - - // Determine the Server that has a connection to the node. - srv, err := a.srv.serverWithNodeConn(alloc.NodeID, a.srv.Region()) - if err != nil { - return err - } - - if srv == nil { - return structs.ErrNoNodeConn - } - - return a.srv.forwardServer(srv, "ClientAllocations.Stats", args, reply) + return findNodeConnAndForward(a.srv, snap, alloc.NodeID, "ClientAllocations.Stats", args, reply) } // Make the RPC diff --git a/nomad/client_fs_endpoint.go b/nomad/client_fs_endpoint.go index 013c89fec..23cbd26b5 100644 --- a/nomad/client_fs_endpoint.go +++ b/nomad/client_fs_endpoint.go @@ -11,7 +11,6 @@ import ( "github.com/hashicorp/nomad/acl" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper" - "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/ugorji/go/codec" ) @@ -42,34 +41,6 @@ func (f *FileSystem) handleStreamResultError(err error, code *int64, encoder *co }) } -// findNodeConnAndForward is a helper for finding the server with a connection -// to the given node and forwarding the RPC to the correct server. This does not -// work for streaming RPCs. -func (f *FileSystem) findNodeConnAndForward(snap *state.StateSnapshot, - nodeID, method string, args, reply interface{}) error { - - node, err := snap.NodeByID(nil, nodeID) - if err != nil { - return err - } - - if node == nil { - return structs.NewErrUnknownNode(nodeID) - } - - // Determine the Server that has a connection to the node. - srv, err := f.srv.serverWithNodeConn(nodeID, f.srv.Region()) - if err != nil { - return err - } - - if srv == nil { - return structs.ErrNoNodeConn - } - - return f.srv.forwardServer(srv, method, args, reply) -} - // forwardRegionStreamingRpc is used to make a streaming RPC to a different // region. It looks up the allocation in the remote region to determine what // remote server can route the request. @@ -162,7 +133,7 @@ func (f *FileSystem) List(args *cstructs.FsListRequest, reply *cstructs.FsListRe // Get the connection to the client state, ok := f.srv.getNodeConn(alloc.NodeID) if !ok { - return f.findNodeConnAndForward(snap, alloc.NodeID, "FileSystem.List", args, reply) + return findNodeConnAndForward(f.srv, snap, alloc.NodeID, "FileSystem.List", args, reply) } // Make the RPC @@ -211,7 +182,7 @@ func (f *FileSystem) Stat(args *cstructs.FsStatRequest, reply *cstructs.FsStatRe // Get the connection to the client state, ok := f.srv.getNodeConn(alloc.NodeID) if !ok { - return f.findNodeConnAndForward(snap, alloc.NodeID, "FileSystem.Stat", args, reply) + return findNodeConnAndForward(f.srv, snap, alloc.NodeID, "FileSystem.Stat", args, reply) } // Make the RPC diff --git a/nomad/client_rpc.go b/nomad/client_rpc.go index 92b749632..214f2c153 100644 --- a/nomad/client_rpc.go +++ b/nomad/client_rpc.go @@ -9,6 +9,7 @@ import ( multierror "github.com/hashicorp/go-multierror" msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/helper/pool" + "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/yamux" "github.com/ugorji/go/codec" @@ -215,3 +216,31 @@ func NodeStreamingRpc(session *yamux.Session, method string) (net.Conn, error) { return stream, nil } + +// findNodeConnAndForward is a helper for finding the server with a connection +// to the given node and forwarding the RPC to the correct server. This does not +// work for streaming RPCs. +func findNodeConnAndForward(srv *Server, snap *state.StateSnapshot, + nodeID, method string, args, reply interface{}) error { + + node, err := snap.NodeByID(nil, nodeID) + if err != nil { + return err + } + + if node == nil { + return fmt.Errorf("Unknown node %q", nodeID) + } + + // Determine the Server that has a connection to the node. + srvWithConn, err := srv.serverWithNodeConn(nodeID, srv.Region()) + if err != nil { + return err + } + + if srvWithConn == nil { + return structs.ErrNoNodeConn + } + + return srv.forwardServer(srvWithConn, method, args, reply) +}