Streaming helper

This commit is contained in:
Alex Dadgar
2018-02-08 13:53:06 -08:00
parent 3e4108634a
commit ff79fbcec8

View File

@@ -71,6 +71,52 @@ func (f *FileSystem) findNodeConnAndForward(snap *state.StateSnapshot,
return f.srv.forwardServer(srv, method, args, reply)
}
// forwardRegionStreamingRpc is used to make a streaming RPC to a different
// region. It looks up the allocation in the remote region to determine what
// remote server can route the request.
func (f *FileSystem) forwardRegionStreamingRpc(conn io.ReadWriteCloser,
encoder *codec.Encoder, args interface{}, method, allocID string, qo *structs.QueryOptions) {
// Request the allocation from the target region
allocReq := &structs.AllocSpecificRequest{
AllocID: allocID,
QueryOptions: *qo,
}
var allocResp structs.SingleAllocResponse
if err := f.srv.forwardRegion(qo.RequestRegion(), "Alloc.GetAlloc", allocReq, &allocResp); err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
if allocResp.Alloc == nil {
f.handleStreamResultError(fmt.Errorf("unknown allocation %q", allocID), nil, encoder)
return
}
// Determine the Server that has a connection to the node.
srv, err := f.srv.serverWithNodeConn(allocResp.Alloc.NodeID, qo.RequestRegion())
if err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
// Get a connection to the server
srvConn, err := f.srv.streamingRpc(srv, method)
if err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
defer srvConn.Close()
// Send the request.
outEncoder := codec.NewEncoder(srvConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
structs.Bridge(conn, srvConn)
}
// List is used to list the contents of an allocation's directory.
func (f *FileSystem) List(args *cstructs.FsListRequest, reply *cstructs.FsListResponse) error {
// We only allow stale reads since the only potentially stale information is
@@ -187,47 +233,9 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) {
// Check if we need to forward to a different region
if r := args.RequestRegion(); r != f.srv.Region() {
// Request the allocation from the target region
allocReq := &structs.AllocSpecificRequest{
AllocID: args.AllocID,
QueryOptions: args.QueryOptions,
}
var allocResp structs.SingleAllocResponse
if err := f.srv.forwardRegion(r, "Alloc.GetAlloc", allocReq, &allocResp); err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
if allocResp.Alloc == nil {
f.handleStreamResultError(fmt.Errorf("unknown allocation %q", args.AllocID), nil, encoder)
return
}
// Determine the Server that has a connection to the node.
srv, err := f.srv.serverWithNodeConn(allocResp.Alloc.NodeID, r)
if err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
// Get a connection to the server
srvConn, err := f.srv.streamingRpc(srv, "FileSystem.Stream")
if err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
defer srvConn.Close()
// Send the request.
outEncoder := codec.NewEncoder(srvConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
structs.Bridge(conn, srvConn)
f.forwardRegionStreamingRpc(conn, encoder, &args, "FileSystem.Stream",
args.AllocID, &args.QueryOptions)
return
}
// Check node read permissions
@@ -321,47 +329,9 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) {
// Check if we need to forward to a different region
if r := args.RequestRegion(); r != f.srv.Region() {
// Request the allocation from the target region
allocReq := &structs.AllocSpecificRequest{
AllocID: args.AllocID,
QueryOptions: args.QueryOptions,
}
var allocResp structs.SingleAllocResponse
if err := f.srv.forwardRegion(r, "Alloc.GetAlloc", allocReq, &allocResp); err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
if allocResp.Alloc == nil {
f.handleStreamResultError(fmt.Errorf("unknown allocation %q", args.AllocID), nil, encoder)
return
}
// Determine the Server that has a connection to the node.
srv, err := f.srv.serverWithNodeConn(allocResp.Alloc.NodeID, r)
if err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
// Get a connection to the server
srvConn, err := f.srv.streamingRpc(srv, "FileSystem.Logs")
if err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
defer srvConn.Close()
// Send the request.
outEncoder := codec.NewEncoder(srvConn, structs.MsgpackHandle)
if err := outEncoder.Encode(args); err != nil {
f.handleStreamResultError(err, nil, encoder)
return
}
structs.Bridge(conn, srvConn)
f.forwardRegionStreamingRpc(conn, encoder, &args, "FileSystem.Logs",
args.AllocID, &args.QueryOptions)
return
}
// Check node read permissions