diff --git a/nomad/client_fs_endpoint.go b/nomad/client_fs_endpoint.go index f79e51ee8..b1731bfe8 100644 --- a/nomad/client_fs_endpoint.go +++ b/nomad/client_fs_endpoint.go @@ -71,6 +71,52 @@ func (f *FileSystem) findNodeConnAndForward(snap *state.StateSnapshot, return f.srv.forwardServer(srv, method, args, reply) } +// forwardRegionStreamingRpc is used to make a streaming RPC to a different +// region. It looks up the allocation in the remote region to determine what +// remote server can route the request. +func (f *FileSystem) forwardRegionStreamingRpc(conn io.ReadWriteCloser, + encoder *codec.Encoder, args interface{}, method, allocID string, qo *structs.QueryOptions) { + // Request the allocation from the target region + allocReq := &structs.AllocSpecificRequest{ + AllocID: allocID, + QueryOptions: *qo, + } + var allocResp structs.SingleAllocResponse + if err := f.srv.forwardRegion(qo.RequestRegion(), "Alloc.GetAlloc", allocReq, &allocResp); err != nil { + f.handleStreamResultError(err, nil, encoder) + return + } + + if allocResp.Alloc == nil { + f.handleStreamResultError(fmt.Errorf("unknown allocation %q", allocID), nil, encoder) + return + } + + // Determine the Server that has a connection to the node. + srv, err := f.srv.serverWithNodeConn(allocResp.Alloc.NodeID, qo.RequestRegion()) + if err != nil { + f.handleStreamResultError(err, nil, encoder) + return + } + + // Get a connection to the server + srvConn, err := f.srv.streamingRpc(srv, method) + if err != nil { + f.handleStreamResultError(err, nil, encoder) + return + } + defer srvConn.Close() + + // Send the request. + outEncoder := codec.NewEncoder(srvConn, structs.MsgpackHandle) + if err := outEncoder.Encode(args); err != nil { + f.handleStreamResultError(err, nil, encoder) + return + } + + structs.Bridge(conn, srvConn) +} + // List is used to list the contents of an allocation's directory. func (f *FileSystem) List(args *cstructs.FsListRequest, reply *cstructs.FsListResponse) error { // We only allow stale reads since the only potentially stale information is @@ -187,47 +233,9 @@ func (f *FileSystem) stream(conn io.ReadWriteCloser) { // Check if we need to forward to a different region if r := args.RequestRegion(); r != f.srv.Region() { - // Request the allocation from the target region - allocReq := &structs.AllocSpecificRequest{ - AllocID: args.AllocID, - QueryOptions: args.QueryOptions, - } - var allocResp structs.SingleAllocResponse - if err := f.srv.forwardRegion(r, "Alloc.GetAlloc", allocReq, &allocResp); err != nil { - f.handleStreamResultError(err, nil, encoder) - return - } - - if allocResp.Alloc == nil { - f.handleStreamResultError(fmt.Errorf("unknown allocation %q", args.AllocID), nil, encoder) - return - } - - // Determine the Server that has a connection to the node. - srv, err := f.srv.serverWithNodeConn(allocResp.Alloc.NodeID, r) - if err != nil { - f.handleStreamResultError(err, nil, encoder) - return - } - - // Get a connection to the server - srvConn, err := f.srv.streamingRpc(srv, "FileSystem.Stream") - if err != nil { - f.handleStreamResultError(err, nil, encoder) - return - } - defer srvConn.Close() - - // Send the request. - outEncoder := codec.NewEncoder(srvConn, structs.MsgpackHandle) - if err := outEncoder.Encode(args); err != nil { - f.handleStreamResultError(err, nil, encoder) - return - } - - structs.Bridge(conn, srvConn) + f.forwardRegionStreamingRpc(conn, encoder, &args, "FileSystem.Stream", + args.AllocID, &args.QueryOptions) return - } // Check node read permissions @@ -321,47 +329,9 @@ func (f *FileSystem) logs(conn io.ReadWriteCloser) { // Check if we need to forward to a different region if r := args.RequestRegion(); r != f.srv.Region() { - // Request the allocation from the target region - allocReq := &structs.AllocSpecificRequest{ - AllocID: args.AllocID, - QueryOptions: args.QueryOptions, - } - var allocResp structs.SingleAllocResponse - if err := f.srv.forwardRegion(r, "Alloc.GetAlloc", allocReq, &allocResp); err != nil { - f.handleStreamResultError(err, nil, encoder) - return - } - - if allocResp.Alloc == nil { - f.handleStreamResultError(fmt.Errorf("unknown allocation %q", args.AllocID), nil, encoder) - return - } - - // Determine the Server that has a connection to the node. - srv, err := f.srv.serverWithNodeConn(allocResp.Alloc.NodeID, r) - if err != nil { - f.handleStreamResultError(err, nil, encoder) - return - } - - // Get a connection to the server - srvConn, err := f.srv.streamingRpc(srv, "FileSystem.Logs") - if err != nil { - f.handleStreamResultError(err, nil, encoder) - return - } - defer srvConn.Close() - - // Send the request. - outEncoder := codec.NewEncoder(srvConn, structs.MsgpackHandle) - if err := outEncoder.Encode(args); err != nil { - f.handleStreamResultError(err, nil, encoder) - return - } - - structs.Bridge(conn, srvConn) + f.forwardRegionStreamingRpc(conn, encoder, &args, "FileSystem.Logs", + args.AllocID, &args.QueryOptions) return - } // Check node read permissions