csi: Fix Controller RPCs

Currently the handling of CSINode RPCs does not correctly handle
forwarding RPCs to Nodes.

This commit fixes this by introducing a shim RPC
(nomad/client_csi_enpdoint) that will correctly forward the request to
the owning node, or submit the RPC to the client.

In the process it also cleans up handling a little bit by adding the
`CSIControllerQuery` embeded struct for required forwarding state.

The CSIControllerQuery embeding the requirement of a `PluginID` also
means we could move node targetting into the shim RPC if wanted in the
future.
This commit is contained in:
Danielle Lancashire
2020-02-21 11:32:10 +01:00
committed by Tim Gross
parent 2f72b480c2
commit fab252ce5d
8 changed files with 345 additions and 95 deletions

View File

@@ -70,7 +70,7 @@ func (c *CSIController) ValidateVolume(req *structs.ClientCSIControllerValidateV
// In the future this may be expanded to request dynamic secrets for attachement.
func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolumeRequest, resp *structs.ClientCSIControllerAttachVolumeResponse) error {
defer metrics.MeasureSince([]string{"client", "csi_controller", "publish_volume"}, time.Now())
plugin, err := c.findControllerPlugin(req.PluginName)
plugin, err := c.findControllerPlugin(req.PluginID)
if err != nil {
return err
}
@@ -85,8 +85,8 @@ func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolum
return errors.New("VolumeID is required")
}
if req.NodeID == "" {
return errors.New("NodeID is required")
if req.ClientCSINodeID == "" {
return errors.New("ClientCSINodeID is required")
}
if !nstructs.ValidCSIVolumeAccessMode(req.AccessMode) {
@@ -109,6 +109,10 @@ func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolum
return nil
}
func (c *CSIController) DetachVolume(req *structs.ClientCSIControllerDetachVolumeRequest, resp *structs.ClientCSIControllerDetachVolumeResponse) error {
return fmt.Errorf("Unimplemented")
}
func (c *CSIController) findControllerPlugin(name string) (csi.CSIPlugin, error) {
return c.findPlugin(dynamicplugins.PluginTypeCSIController, name)
}

View File

@@ -31,43 +31,53 @@ func TestCSIController_AttachVolume(t *testing.T) {
{
Name: "returns plugin not found errors",
Request: &structs.ClientCSIControllerAttachVolumeRequest{
PluginName: "some-garbage",
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: "some-garbage",
},
},
ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"),
},
{
Name: "validates volumeid is not empty",
Request: &structs.ClientCSIControllerAttachVolumeRequest{
PluginName: fakePlugin.Name,
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
},
ExpectedErr: errors.New("VolumeID is required"),
},
{
Name: "validates nodeid is not empty",
Request: &structs.ClientCSIControllerAttachVolumeRequest{
PluginName: fakePlugin.Name,
VolumeID: "1234-4321-1234-4321",
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
VolumeID: "1234-4321-1234-4321",
},
ExpectedErr: errors.New("NodeID is required"),
ExpectedErr: errors.New("ClientCSINodeID is required"),
},
{
Name: "validates AccessMode",
Request: &structs.ClientCSIControllerAttachVolumeRequest{
PluginName: fakePlugin.Name,
VolumeID: "1234-4321-1234-4321",
NodeID: "abcde",
AccessMode: nstructs.CSIVolumeAccessMode("foo"),
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
VolumeID: "1234-4321-1234-4321",
ClientCSINodeID: "abcde",
AccessMode: nstructs.CSIVolumeAccessMode("foo"),
},
ExpectedErr: errors.New("Unknown access mode: foo"),
},
{
Name: "validates attachmentmode is not empty",
Request: &structs.ClientCSIControllerAttachVolumeRequest{
PluginName: fakePlugin.Name,
VolumeID: "1234-4321-1234-4321",
NodeID: "abcde",
AccessMode: nstructs.CSIVolumeAccessModeMultiNodeReader,
AttachmentMode: nstructs.CSIVolumeAttachmentMode("bar"),
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
VolumeID: "1234-4321-1234-4321",
ClientCSINodeID: "abcde",
AccessMode: nstructs.CSIVolumeAccessModeMultiNodeReader,
AttachmentMode: nstructs.CSIVolumeAttachmentMode("bar"),
},
ExpectedErr: errors.New("Unknown attachment mode: bar"),
},
@@ -77,11 +87,13 @@ func TestCSIController_AttachVolume(t *testing.T) {
fc.NextControllerPublishVolumeErr = errors.New("hello")
},
Request: &structs.ClientCSIControllerAttachVolumeRequest{
PluginName: fakePlugin.Name,
VolumeID: "1234-4321-1234-4321",
NodeID: "abcde",
AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem,
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
VolumeID: "1234-4321-1234-4321",
ClientCSINodeID: "abcde",
AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem,
},
ExpectedErr: errors.New("hello"),
},
@@ -91,11 +103,13 @@ func TestCSIController_AttachVolume(t *testing.T) {
fc.NextControllerPublishVolumeResponse = &csi.ControllerPublishVolumeResponse{}
},
Request: &structs.ClientCSIControllerAttachVolumeRequest{
PluginName: fakePlugin.Name,
VolumeID: "1234-4321-1234-4321",
NodeID: "abcde",
AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem,
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
VolumeID: "1234-4321-1234-4321",
ClientCSINodeID: "abcde",
AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem,
},
ExpectedResponse: &structs.ClientCSIControllerAttachVolumeResponse{},
},
@@ -107,11 +121,13 @@ func TestCSIController_AttachVolume(t *testing.T) {
}
},
Request: &structs.ClientCSIControllerAttachVolumeRequest{
PluginName: fakePlugin.Name,
VolumeID: "1234-4321-1234-4321",
NodeID: "abcde",
AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem,
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
VolumeID: "1234-4321-1234-4321",
ClientCSINodeID: "abcde",
AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem,
},
ExpectedResponse: &structs.ClientCSIControllerAttachVolumeResponse{
PublishContext: map[string]string{"foo": "bar"},
@@ -161,14 +177,18 @@ func TestCSIController_ValidateVolume(t *testing.T) {
{
Name: "validates volumeid is not empty",
Request: &structs.ClientCSIControllerValidateVolumeRequest{
PluginID: fakePlugin.Name,
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
},
ExpectedErr: errors.New("VolumeID is required"),
},
{
Name: "returns plugin not found errors",
Request: &structs.ClientCSIControllerValidateVolumeRequest{
PluginID: "some-garbage",
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: "some-garbage",
},
VolumeID: "foo",
},
ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"),
@@ -176,7 +196,9 @@ func TestCSIController_ValidateVolume(t *testing.T) {
{
Name: "validates attachmentmode",
Request: &structs.ClientCSIControllerValidateVolumeRequest{
PluginID: fakePlugin.Name,
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
VolumeID: "1234-4321-1234-4321",
AttachmentMode: nstructs.CSIVolumeAttachmentMode("bar"),
AccessMode: nstructs.CSIVolumeAccessModeMultiNodeReader,
@@ -186,7 +208,9 @@ func TestCSIController_ValidateVolume(t *testing.T) {
{
Name: "validates AccessMode",
Request: &structs.ClientCSIControllerValidateVolumeRequest{
PluginID: fakePlugin.Name,
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
VolumeID: "1234-4321-1234-4321",
AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem,
AccessMode: nstructs.CSIVolumeAccessMode("foo"),
@@ -199,7 +223,9 @@ func TestCSIController_ValidateVolume(t *testing.T) {
fc.NextControllerValidateVolumeErr = errors.New("hello")
},
Request: &structs.ClientCSIControllerValidateVolumeRequest{
PluginID: fakePlugin.Name,
CSIControllerQuery: structs.CSIControllerQuery{
PluginID: fakePlugin.Name,
},
VolumeID: "1234-4321-1234-4321",
AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem,

View File

@@ -20,27 +20,36 @@ type CSIVolumeMountOptions struct {
MountFlags []string
}
type ClientCSIControllerValidateVolumeRequest struct {
// CSIControllerQuery is used to specify various flags for queries against CSI
// Controllers
type CSIControllerQuery struct {
// ControllerNodeID is the node that should be targeted by the request
ControllerNodeID string
// PluginID is the plugin that should be targeted on the given node.
PluginID string
}
type ClientCSIControllerValidateVolumeRequest struct {
VolumeID string
AttachmentMode structs.CSIVolumeAttachmentMode
AccessMode structs.CSIVolumeAccessMode
CSIControllerQuery
}
type ClientCSIControllerValidateVolumeResponse struct {
}
type ClientCSIControllerAttachVolumeRequest struct {
PluginName string
// The ID of the volume to be used on a node.
// This field is REQUIRED.
VolumeID string
// The ID of the node. This field is REQUIRED. This must match the NodeID that
// is fingerprinted by the target node for this plugin name.
NodeID string
ClientCSINodeID string
// AttachmentMode indicates how the volume should be attached and mounted into
// a task.
@@ -56,6 +65,8 @@ type ClientCSIControllerAttachVolumeRequest struct {
// ReadOnly indicates that the volume will be used in a readonly fashion. This
// only works when the Controller has the PublishReadonly capability.
ReadOnly bool
CSIControllerQuery
}
func (c *ClientCSIControllerAttachVolumeRequest) ToCSIRequest() *csi.ControllerPublishVolumeRequest {
@@ -65,7 +76,7 @@ func (c *ClientCSIControllerAttachVolumeRequest) ToCSIRequest() *csi.ControllerP
return &csi.ControllerPublishVolumeRequest{
VolumeID: c.VolumeID,
NodeID: c.NodeID,
NodeID: c.ClientCSINodeID,
ReadOnly: c.ReadOnly,
}
}
@@ -88,15 +99,16 @@ type ClientCSIControllerAttachVolumeResponse struct {
}
type ClientCSIControllerDetachVolumeRequest struct {
PluginName string
// The ID of the volume to be unpublished for the node
// This field is REQUIRED.
VolumeID string
// The ID of the node. This field is REQUIRED. This must match the NodeID that
// is fingerprinted by the target node for this plugin name.
NodeID string
// The CSI Node ID for the Node that the volume should be detached from.
// This field is REQUIRED. This must match the NodeID that is fingerprinted
// by the target node for this plugin name.
ClientCSINodeID string
CSIControllerQuery
}
func (c *ClientCSIControllerDetachVolumeRequest) ToCSIRequest() *csi.ControllerUnpublishVolumeRequest {
@@ -106,7 +118,7 @@ func (c *ClientCSIControllerDetachVolumeRequest) ToCSIRequest() *csi.ControllerU
return &csi.ControllerUnpublishVolumeRequest{
VolumeID: c.VolumeID,
NodeID: c.NodeID,
NodeID: c.ClientCSINodeID,
}
}

View File

@@ -0,0 +1,75 @@
package nomad
import (
"errors"
"time"
metrics "github.com/armon/go-metrics"
log "github.com/hashicorp/go-hclog"
cstructs "github.com/hashicorp/nomad/client/structs"
)
// ClientCSIController is used to forward RPC requests to the targed Nomad client's
// CSIController endpoint.
type ClientCSIController struct {
srv *Server
logger log.Logger
}
func (a *ClientCSIController) AttachVolume(args *cstructs.ClientCSIControllerAttachVolumeRequest, reply *cstructs.ClientCSIControllerAttachVolumeResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "attach_volume"}, time.Now())
// Verify the arguments.
if args.ControllerNodeID == "" {
return errors.New("missing ControllerNodeID")
}
// Make sure Node is valid and new enough to support RPC
snap, err := a.srv.State().Snapshot()
if err != nil {
return err
}
_, err = getNodeForRpc(snap, args.ControllerNodeID)
if err != nil {
return err
}
// Get the connection to the client
state, ok := a.srv.getNodeConn(args.ControllerNodeID)
if !ok {
return findNodeConnAndForward(a.srv, args.ControllerNodeID, "ClientCSIController.AttachVolume", args, reply)
}
// Make the RPC
return NodeRpc(state.Session, "CSIController.AttachVolume", args, reply)
}
func (a *ClientCSIController) ValidateVolume(args *cstructs.ClientCSIControllerValidateVolumeRequest, reply *cstructs.ClientCSIControllerValidateVolumeResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "validate_volume"}, time.Now())
// Verify the arguments.
if args.ControllerNodeID == "" {
return errors.New("missing ControllerNodeID")
}
// Make sure Node is valid and new enough to support RPC
snap, err := a.srv.State().Snapshot()
if err != nil {
return err
}
_, err = getNodeForRpc(snap, args.ControllerNodeID)
if err != nil {
return err
}
// Get the connection to the client
state, ok := a.srv.getNodeConn(args.ControllerNodeID)
if !ok {
return findNodeConnAndForward(a.srv, args.ControllerNodeID, "ClientCSIController.ValidateVolume", args, reply)
}
// Make the RPC
return NodeRpc(state.Session, "CSIController.ValidateVolume", args, reply)
}

View File

@@ -0,0 +1,93 @@
package nomad
import (
"testing"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/nomad/client"
"github.com/hashicorp/nomad/client/config"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/stretchr/testify/require"
)
func TestClientCSIController_AttachVolume_Local(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s, cleanupS := TestServer(t, nil)
defer cleanupS()
codec := rpcClient(t, s)
testutil.WaitForLeader(t, s.RPC)
c, cleanupC := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s.config.RPCAddr.String()}
})
defer cleanupC()
testutil.WaitForResult(func() (bool, error) {
nodes := s.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
require.Fail("should have a client")
})
req := &cstructs.ClientCSIControllerAttachVolumeRequest{
CSIControllerQuery: cstructs.CSIControllerQuery{ControllerNodeID: c.NodeID()},
}
// Fetch the response
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSIController.AttachVolume", req, &resp)
require.NotNil(err)
// Should recieve an error from the client endpoint
require.Contains(err.Error(), "must specify plugin name to dispense")
}
func TestClientCSIController_AttachVolume_Forwarded(t *testing.T) {
t.Parallel()
require := require.New(t)
// Start a server and client
s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
s2, cleanupS2 := TestServer(t, func(c *Config) {
c.DevDisableBootstrap = true
})
defer cleanupS2()
TestJoin(t, s1, s2)
testutil.WaitForLeader(t, s1.RPC)
testutil.WaitForLeader(t, s2.RPC)
codec := rpcClient(t, s2)
c, cleanupC := client.TestClient(t, func(c *config.Config) {
c.Servers = []string{s2.config.RPCAddr.String()}
c.GCDiskUsageThreshold = 100.0
})
defer cleanupC()
testutil.WaitForResult(func() (bool, error) {
nodes := s2.connectedNodes()
return len(nodes) == 1, nil
}, func(err error) {
require.Fail("should have a client")
})
// Force remove the connection locally in case it exists
s1.nodeConnsLock.Lock()
delete(s1.nodeConns, c.NodeID())
s1.nodeConnsLock.Unlock()
req := &cstructs.ClientCSIControllerAttachVolumeRequest{
CSIControllerQuery: cstructs.CSIControllerQuery{ControllerNodeID: c.NodeID()},
}
// Fetch the response
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "ClientCSIController.AttachVolume", req, &resp)
require.NotNil(err)
// Should recieve an error from the client endpoint
require.Contains(err.Error(), "must specify plugin name to dispense")
}

View File

@@ -222,16 +222,26 @@ func (srv *Server) controllerValidateVolume(req *structs.CSIVolumeRegisterReques
// The plugin requires a controller. Now we do some validation of the Volume
// to ensure that the registered capabilities are valid and that the volume
// exists.
method := "ClientCSI.CSIControllerValidateVolume"
// plugin IDs are not scoped to region/DC but volumes are.
// so any node we get for a controller is already in the same region/DC
// for the volume.
nodeID, err := srv.nodeForControllerPlugin(plugin)
if err != nil || nodeID == "" {
return err
}
method := "ClientCSIController.ValidateVolume"
cReq := &cstructs.ClientCSIControllerValidateVolumeRequest{
PluginID: plugin.ID,
VolumeID: vol.ID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
}
cReq.PluginID = plugin.ID
cReq.ControllerNodeID = nodeID
cResp := &cstructs.ClientCSIControllerValidateVolumeResponse{}
return srv.csiControllerRPC(plugin, method, cReq, cResp)
return srv.RPC(method, cReq, cResp)
}
// Register registers a new volume
@@ -483,25 +493,42 @@ func (srv *Server) controllerPublishVolume(req *structs.CSIVolumeClaimRequest, r
return nil
}
method := "ClientCSI.CSIControllerAttachVolume"
// plugin IDs are not scoped to region/DC but volumes are.
// so any node we get for a controller is already in the same region/DC
// for the volume.
nodeID, err := srv.nodeForControllerPlugin(plug)
if err != nil || nodeID == "" {
return err
}
targetNode, err := state.NodeByID(ws, alloc.NodeID)
if err != nil {
return err
}
if targetNode == nil {
return fmt.Errorf("%s: %s", structs.ErrUnknownNodePrefix, alloc.NodeID)
}
targetCSIInfo, ok := targetNode.CSINodePlugins[plug.ID]
if !ok {
return fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID)
}
method := "ClientCSIController.AttachVolume"
cReq := &cstructs.ClientCSIControllerAttachVolumeRequest{
PluginName: plug.ID,
VolumeID: req.VolumeID,
NodeID: alloc.NodeID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
ReadOnly: req.Claim == structs.CSIVolumeClaimRead,
VolumeID: req.VolumeID,
ClientCSINodeID: targetCSIInfo.NodeInfo.ID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
ReadOnly: req.Claim == structs.CSIVolumeClaimRead,
// TODO(tgross): we don't have a way of setting these yet.
// ref https://github.com/hashicorp/nomad/issues/7007
// MountOptions: vol.MountOptions,
}
cReq.PluginID = plug.ID
cReq.ControllerNodeID = nodeID
cResp := &cstructs.ClientCSIControllerAttachVolumeResponse{}
// CSI controller plugins can block for arbitrarily long times,
// but we need to make sure it completes before we can safely
// mark the volume as claimed and return to the client so it
// can do a `NodePublish`.
err = srv.csiControllerRPC(plug, method, cReq, cResp)
err = srv.RPC(method, cReq, cResp)
if err != nil {
return err
}
@@ -512,24 +539,43 @@ func (srv *Server) controllerPublishVolume(req *structs.CSIVolumeClaimRequest, r
// controllerUnpublishVolume sends an unpublish request to the CSI
// controller plugin associated with a volume, if any.
// TODO: the only caller of this won't have an alloc pointer handy, should it be its own request arg type?
func (srv *Server) controllerUnpublishVolume(req *structs.CSIVolumeClaimRequest, nodeID string) error {
func (srv *Server) controllerUnpublishVolume(req *structs.CSIVolumeClaimRequest, targetNomadNodeID string) error {
plug, vol, err := srv.volAndPluginLookup(req.VolumeID)
if plug == nil || vol == nil || err != nil {
return err // possibly nil if no controller required
}
method := "ClientCSI.DetachVolume"
cReq := &cstructs.ClientCSIControllerDetachVolumeRequest{
PluginName: plug.ID,
VolumeID: req.VolumeID,
NodeID: nodeID,
}
err = srv.csiControllerRPC(plug, method, cReq,
&cstructs.ClientCSIControllerDetachVolumeResponse{})
ws := memdb.NewWatchSet()
state := srv.State()
targetNode, err := state.NodeByID(ws, targetNomadNodeID)
if err != nil {
return err
}
return nil
if targetNode == nil {
return fmt.Errorf("%s: %s", structs.ErrUnknownNodePrefix, targetNomadNodeID)
}
targetCSIInfo, ok := targetNode.CSINodePlugins[plug.ID]
if !ok {
return fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID)
}
// plugin IDs are not scoped to region/DC but volumes are.
// so any node we get for a controller is already in the same region/DC
// for the volume.
nodeID, err := srv.nodeForControllerPlugin(plug)
if err != nil || nodeID == "" {
return err
}
method := "ClientCSIController.DetachVolume"
cReq := &cstructs.ClientCSIControllerDetachVolumeRequest{
VolumeID: req.VolumeID,
ClientCSINodeID: targetCSIInfo.NodeInfo.ID,
}
cReq.PluginID = plug.ID
cReq.ControllerNodeID = nodeID
return srv.RPC(method, cReq, &cstructs.ClientCSIControllerDetachVolumeResponse{})
}
func (srv *Server) volAndPluginLookup(volID string) (*structs.CSIPlugin, *structs.CSIVolume, error) {
@@ -560,24 +606,6 @@ func (srv *Server) volAndPluginLookup(volID string) (*structs.CSIPlugin, *struct
return plug, vol, nil
}
func (srv *Server) csiControllerRPC(plugin *structs.CSIPlugin, method string, args, reply interface{}) error {
// plugin IDs are not scoped to region/DC but volumes are.
// so any node we get for a controller is already in the same region/DC
// for the volume.
nodeID, err := srv.nodeForControllerPlugin(plugin)
if err != nil || nodeID == "" {
return err
}
err = findNodeConnAndForward(srv, nodeID, method, args, reply)
if err != nil {
return err
}
if replyErr, ok := reply.(error); ok {
return replyErr
}
return nil
}
// nodeForControllerPlugin returns the node ID for a random controller
// to load-balance long-blocking RPCs across client nodes.
func (srv *Server) nodeForControllerPlugin(plugin *structs.CSIPlugin) (string, error) {

View File

@@ -336,6 +336,14 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) {
RequiresControllerPlugin: true,
},
}
node.CSINodePlugins = map[string]*structs.CSIInfo{
"minnie": {PluginID: "minnie",
Healthy: true,
ControllerInfo: &structs.CSIControllerInfo{},
NodeInfo: &structs.CSINodeInfo{},
RequiresControllerPlugin: true,
},
}
err := state.UpsertNode(1002, node)
require.NoError(t, err)
vols := []*structs.CSIVolume{{
@@ -367,6 +375,7 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) {
}
claimResp := &structs.CSIVolumeClaimResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp)
// Because the node is not registered
require.EqualError(t, err, "No path to node")
}

View File

@@ -261,10 +261,11 @@ type endpoints struct {
Enterprise *EnterpriseEndpoints
// Client endpoints
ClientStats *ClientStats
FileSystem *FileSystem
Agent *Agent
ClientAllocations *ClientAllocations
ClientStats *ClientStats
FileSystem *FileSystem
Agent *Agent
ClientAllocations *ClientAllocations
ClientCSIController *ClientCSIController
}
// NewServer is used to construct a new Nomad server from the
@@ -1110,6 +1111,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) {
s.staticEndpoints.ClientStats = &ClientStats{srv: s, logger: s.logger.Named("client_stats")}
s.staticEndpoints.ClientAllocations = &ClientAllocations{srv: s, logger: s.logger.Named("client_allocs")}
s.staticEndpoints.ClientAllocations.register()
s.staticEndpoints.ClientCSIController = &ClientCSIController{srv: s, logger: s.logger.Named("client_csi")}
// Streaming endpoints
s.staticEndpoints.FileSystem = &FileSystem{srv: s, logger: s.logger.Named("client_fs")}
@@ -1137,6 +1139,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) {
s.staticEndpoints.Enterprise.Register(server)
server.Register(s.staticEndpoints.ClientStats)
server.Register(s.staticEndpoints.ClientAllocations)
server.Register(s.staticEndpoints.ClientCSIController)
server.Register(s.staticEndpoints.FileSystem)
server.Register(s.staticEndpoints.Agent)