From 99c0bdf9d784c7fd943d0bada2d0cf0df71ed084 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Tue, 6 Feb 2018 15:33:02 -0800 Subject: [PATCH] Implement MultiplexV2 RPC handling Implements and tests the V2 multiplexer. This will not be used until several versions of Nomad have been released to mitigate upgrade concerns. --- nomad/rpc.go | 73 +++++++++++++++++++++++++++++++++++++++++++---- nomad/rpc_test.go | 61 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 128 insertions(+), 6 deletions(-) diff --git a/nomad/rpc.go b/nomad/rpc.go index 63b5d8d28..f89fdcc4c 100644 --- a/nomad/rpc.go +++ b/nomad/rpc.go @@ -283,7 +283,56 @@ func (s *Server) handleStreamingConn(conn net.Conn) { // using the Yamux multiplexer. Version 2 handling allows a single connection to // switch streams between regulars RPCs and Streaming RPCs. func (s *Server) handleMultiplexV2(conn net.Conn, ctx *RPCContext) { - // TODO + defer func() { + // Remove any potential mapping between a NodeID to this connection and + // close the underlying connection. + s.removeNodeConn(ctx) + conn.Close() + }() + + conf := yamux.DefaultConfig() + conf.LogOutput = s.config.LogOutput + server, _ := yamux.Server(conn, conf) + + // Update the context to store the yamux session + ctx.Session = server + + // Create the RPC server for this connection + rpcServer := rpc.NewServer() + s.setupRpcServer(rpcServer, ctx) + + for { + // Accept a new stream + sub, err := server.Accept() + if err != nil { + if err != io.EOF { + s.logger.Printf("[ERR] nomad.rpc: multiplex_v2 conn accept failed: %v", err) + } + return + } + + // Read a single byte + buf := make([]byte, 1) + if _, err := sub.Read(buf); err != nil { + if err != io.EOF { + s.logger.Printf("[ERR] nomad.rpc: multiplex_v2 failed to read byte: %v", err) + } + return + } + + // Determine which handler to use + switch pool.RPCType(buf[0]) { + case pool.RpcNomad: + go s.handleNomadConn(sub, rpcServer) + case pool.RpcStreaming: + go s.handleStreamingConn(sub) + + default: + s.logger.Printf("[ERR] nomad.rpc: multiplex_v2 unrecognized RPC byte: %v", buf[0]) + return + } + } + } // forward is used to forward to a remote region or to forward to the local leader @@ -419,6 +468,18 @@ func (s *Server) streamingRpc(server *serverParts, method string) (net.Conn, err tcp.SetNoDelay(true) } + if err := s.streamingRpcImpl(conn, method); err != nil { + return nil, err + } + + return conn, nil +} + +// streamingRpcImpl takes a pre-established connection to a server and conducts +// the handshake to establish a streaming RPC for the given method. If an error +// is returned, the underlying connection has been closed. Otherwise it is +// assumed that the connection has been hijacked by the RPC method. +func (s *Server) streamingRpcImpl(conn net.Conn, method string) error { // TODO TLS // Check if TLS is enabled //if p.tlsWrap != nil { @@ -440,7 +501,7 @@ func (s *Server) streamingRpc(server *serverParts, method string) (net.Conn, err // Write the multiplex byte to set the mode if _, err := conn.Write([]byte{byte(pool.RpcStreaming)}); err != nil { conn.Close() - return nil, err + return err } // Send the header @@ -451,22 +512,22 @@ func (s *Server) streamingRpc(server *serverParts, method string) (net.Conn, err } if err := encoder.Encode(header); err != nil { conn.Close() - return nil, err + return err } // Wait for the acknowledgement var ack structs.StreamingRpcAck if err := decoder.Decode(&ack); err != nil { conn.Close() - return nil, err + return err } if ack.Error != "" { conn.Close() - return nil, errors.New(ack.Error) + return errors.New(ack.Error) } - return conn, nil + return nil } // raftApplyFuture is used to encode a message, run it through raft, and return the Raft future. diff --git a/nomad/rpc_test.go b/nomad/rpc_test.go index dd3bd4e1b..eb85af57e 100644 --- a/nomad/rpc_test.go +++ b/nomad/rpc_test.go @@ -10,11 +10,13 @@ import ( msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/helper/pool" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/nomad/structs/config" "github.com/hashicorp/nomad/testutil" "github.com/hashicorp/raft" + "github.com/hashicorp/yamux" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -201,3 +203,62 @@ func TestRPC_streamingRpcConn_badMethod(t *testing.T) { require.NotNil(err) require.Contains(err.Error(), "unknown rpc method: \"Bogus\"") } + +// COMPAT: Remove in 0.10 +// This is a very low level test to assert that the V2 handling works. It is +// making manual RPC calls since no helpers exist at this point since we are +// only implementing support for v2 but not using it yet. In the future we can +// switch the conn pool to establishing v2 connections and we can deprecate this +// test. +func TestRPC_handleMultiplexV2(t *testing.T) { + t.Parallel() + require := require.New(t) + s := TestServer(t, nil) + defer s.Shutdown() + testutil.WaitForLeader(t, s.RPC) + + p1, p2 := net.Pipe() + defer p1.Close() + defer p2.Close() + + // Start the handler + doneCh := make(chan struct{}) + go func() { + s.handleConn(p2, &RPCContext{Conn: p2}) + close(doneCh) + }() + + // Establish the MultiplexV2 connection + _, err := p1.Write([]byte{byte(pool.RpcMultiplexV2)}) + require.Nil(err) + + // Make two streams + conf := yamux.DefaultConfig() + conf.LogOutput = testlog.NewWriter(t) + session, err := yamux.Client(p1, conf) + require.Nil(err) + + s1, err := session.Open() + require.Nil(err) + defer s1.Close() + + s2, err := session.Open() + require.Nil(err) + defer s2.Close() + + // Make an RPC + _, err = s1.Write([]byte{byte(pool.RpcNomad)}) + require.Nil(err) + + args := &structs.GenericRequest{} + var l string + err = msgpackrpc.CallWithCodec(pool.NewClientCodec(s1), "Status.Leader", args, &l) + require.Nil(err) + require.NotEmpty(l) + + // Make a streaming RPC + err = s.streamingRpcImpl(s2, "Bogus") + require.NotNil(err) + require.Contains(err.Error(), "unknown rpc") + +}