Implement MultiplexV2 RPC handling

Implements and tests the V2 multiplexer. This will not be used until
several versions of Nomad have been released to mitigate upgrade
concerns.
This commit is contained in:
Alex Dadgar
2018-02-06 15:33:02 -08:00
parent 0706f4e86f
commit 99c0bdf9d7
2 changed files with 128 additions and 6 deletions

View File

@@ -283,7 +283,56 @@ func (s *Server) handleStreamingConn(conn net.Conn) {
// using the Yamux multiplexer. Version 2 handling allows a single connection to
// switch streams between regulars RPCs and Streaming RPCs.
func (s *Server) handleMultiplexV2(conn net.Conn, ctx *RPCContext) {
// TODO
defer func() {
// Remove any potential mapping between a NodeID to this connection and
// close the underlying connection.
s.removeNodeConn(ctx)
conn.Close()
}()
conf := yamux.DefaultConfig()
conf.LogOutput = s.config.LogOutput
server, _ := yamux.Server(conn, conf)
// Update the context to store the yamux session
ctx.Session = server
// Create the RPC server for this connection
rpcServer := rpc.NewServer()
s.setupRpcServer(rpcServer, ctx)
for {
// Accept a new stream
sub, err := server.Accept()
if err != nil {
if err != io.EOF {
s.logger.Printf("[ERR] nomad.rpc: multiplex_v2 conn accept failed: %v", err)
}
return
}
// Read a single byte
buf := make([]byte, 1)
if _, err := sub.Read(buf); err != nil {
if err != io.EOF {
s.logger.Printf("[ERR] nomad.rpc: multiplex_v2 failed to read byte: %v", err)
}
return
}
// Determine which handler to use
switch pool.RPCType(buf[0]) {
case pool.RpcNomad:
go s.handleNomadConn(sub, rpcServer)
case pool.RpcStreaming:
go s.handleStreamingConn(sub)
default:
s.logger.Printf("[ERR] nomad.rpc: multiplex_v2 unrecognized RPC byte: %v", buf[0])
return
}
}
}
// forward is used to forward to a remote region or to forward to the local leader
@@ -419,6 +468,18 @@ func (s *Server) streamingRpc(server *serverParts, method string) (net.Conn, err
tcp.SetNoDelay(true)
}
if err := s.streamingRpcImpl(conn, method); err != nil {
return nil, err
}
return conn, nil
}
// streamingRpcImpl takes a pre-established connection to a server and conducts
// the handshake to establish a streaming RPC for the given method. If an error
// is returned, the underlying connection has been closed. Otherwise it is
// assumed that the connection has been hijacked by the RPC method.
func (s *Server) streamingRpcImpl(conn net.Conn, method string) error {
// TODO TLS
// Check if TLS is enabled
//if p.tlsWrap != nil {
@@ -440,7 +501,7 @@ func (s *Server) streamingRpc(server *serverParts, method string) (net.Conn, err
// Write the multiplex byte to set the mode
if _, err := conn.Write([]byte{byte(pool.RpcStreaming)}); err != nil {
conn.Close()
return nil, err
return err
}
// Send the header
@@ -451,22 +512,22 @@ func (s *Server) streamingRpc(server *serverParts, method string) (net.Conn, err
}
if err := encoder.Encode(header); err != nil {
conn.Close()
return nil, err
return err
}
// Wait for the acknowledgement
var ack structs.StreamingRpcAck
if err := decoder.Decode(&ack); err != nil {
conn.Close()
return nil, err
return err
}
if ack.Error != "" {
conn.Close()
return nil, errors.New(ack.Error)
return errors.New(ack.Error)
}
return conn, nil
return nil
}
// raftApplyFuture is used to encode a message, run it through raft, and return the Raft future.

View File

@@ -10,11 +10,13 @@ import (
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/helper/pool"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/raft"
"github.com/hashicorp/yamux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
@@ -201,3 +203,62 @@ func TestRPC_streamingRpcConn_badMethod(t *testing.T) {
require.NotNil(err)
require.Contains(err.Error(), "unknown rpc method: \"Bogus\"")
}
// COMPAT: Remove in 0.10
// This is a very low level test to assert that the V2 handling works. It is
// making manual RPC calls since no helpers exist at this point since we are
// only implementing support for v2 but not using it yet. In the future we can
// switch the conn pool to establishing v2 connections and we can deprecate this
// test.
func TestRPC_handleMultiplexV2(t *testing.T) {
t.Parallel()
require := require.New(t)
s := TestServer(t, nil)
defer s.Shutdown()
testutil.WaitForLeader(t, s.RPC)
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()
// Start the handler
doneCh := make(chan struct{})
go func() {
s.handleConn(p2, &RPCContext{Conn: p2})
close(doneCh)
}()
// Establish the MultiplexV2 connection
_, err := p1.Write([]byte{byte(pool.RpcMultiplexV2)})
require.Nil(err)
// Make two streams
conf := yamux.DefaultConfig()
conf.LogOutput = testlog.NewWriter(t)
session, err := yamux.Client(p1, conf)
require.Nil(err)
s1, err := session.Open()
require.Nil(err)
defer s1.Close()
s2, err := session.Open()
require.Nil(err)
defer s2.Close()
// Make an RPC
_, err = s1.Write([]byte{byte(pool.RpcNomad)})
require.Nil(err)
args := &structs.GenericRequest{}
var l string
err = msgpackrpc.CallWithCodec(pool.NewClientCodec(s1), "Status.Leader", args, &l)
require.Nil(err)
require.NotEmpty(l)
// Make a streaming RPC
err = s.streamingRpcImpl(s2, "Bogus")
require.NotNil(err)
require.Contains(err.Error(), "unknown rpc")
}