diff --git a/client/csi_controller_endpoint.go b/client/csi_controller_endpoint.go index 5aaf66b6e..f3a386ae4 100644 --- a/client/csi_controller_endpoint.go +++ b/client/csi_controller_endpoint.go @@ -70,7 +70,7 @@ func (c *CSIController) ValidateVolume(req *structs.ClientCSIControllerValidateV // In the future this may be expanded to request dynamic secrets for attachement. func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolumeRequest, resp *structs.ClientCSIControllerAttachVolumeResponse) error { defer metrics.MeasureSince([]string{"client", "csi_controller", "publish_volume"}, time.Now()) - plugin, err := c.findControllerPlugin(req.PluginName) + plugin, err := c.findControllerPlugin(req.PluginID) if err != nil { return err } @@ -85,8 +85,8 @@ func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolum return errors.New("VolumeID is required") } - if req.NodeID == "" { - return errors.New("NodeID is required") + if req.ClientCSINodeID == "" { + return errors.New("ClientCSINodeID is required") } if !nstructs.ValidCSIVolumeAccessMode(req.AccessMode) { @@ -109,6 +109,10 @@ func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolum return nil } +func (c *CSIController) DetachVolume(req *structs.ClientCSIControllerDetachVolumeRequest, resp *structs.ClientCSIControllerDetachVolumeResponse) error { + return fmt.Errorf("Unimplemented") +} + func (c *CSIController) findControllerPlugin(name string) (csi.CSIPlugin, error) { return c.findPlugin(dynamicplugins.PluginTypeCSIController, name) } diff --git a/client/csi_controller_endpoint_test.go b/client/csi_controller_endpoint_test.go index db801b1da..ca9129fb6 100644 --- a/client/csi_controller_endpoint_test.go +++ b/client/csi_controller_endpoint_test.go @@ -31,43 +31,53 @@ func TestCSIController_AttachVolume(t *testing.T) { { Name: "returns plugin not found errors", Request: &structs.ClientCSIControllerAttachVolumeRequest{ - PluginName: "some-garbage", + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: "some-garbage", + }, }, ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"), }, { Name: "validates volumeid is not empty", Request: &structs.ClientCSIControllerAttachVolumeRequest{ - PluginName: fakePlugin.Name, + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: fakePlugin.Name, + }, }, ExpectedErr: errors.New("VolumeID is required"), }, { Name: "validates nodeid is not empty", Request: &structs.ClientCSIControllerAttachVolumeRequest{ - PluginName: fakePlugin.Name, - VolumeID: "1234-4321-1234-4321", + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: fakePlugin.Name, + }, + VolumeID: "1234-4321-1234-4321", }, - ExpectedErr: errors.New("NodeID is required"), + ExpectedErr: errors.New("ClientCSINodeID is required"), }, { Name: "validates AccessMode", Request: &structs.ClientCSIControllerAttachVolumeRequest{ - PluginName: fakePlugin.Name, - VolumeID: "1234-4321-1234-4321", - NodeID: "abcde", - AccessMode: nstructs.CSIVolumeAccessMode("foo"), + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: fakePlugin.Name, + }, + VolumeID: "1234-4321-1234-4321", + ClientCSINodeID: "abcde", + AccessMode: nstructs.CSIVolumeAccessMode("foo"), }, ExpectedErr: errors.New("Unknown access mode: foo"), }, { Name: "validates attachmentmode is not empty", Request: &structs.ClientCSIControllerAttachVolumeRequest{ - PluginName: fakePlugin.Name, - VolumeID: "1234-4321-1234-4321", - NodeID: "abcde", - AccessMode: nstructs.CSIVolumeAccessModeMultiNodeReader, - AttachmentMode: nstructs.CSIVolumeAttachmentMode("bar"), + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: fakePlugin.Name, + }, + VolumeID: "1234-4321-1234-4321", + ClientCSINodeID: "abcde", + AccessMode: nstructs.CSIVolumeAccessModeMultiNodeReader, + AttachmentMode: nstructs.CSIVolumeAttachmentMode("bar"), }, ExpectedErr: errors.New("Unknown attachment mode: bar"), }, @@ -77,11 +87,13 @@ func TestCSIController_AttachVolume(t *testing.T) { fc.NextControllerPublishVolumeErr = errors.New("hello") }, Request: &structs.ClientCSIControllerAttachVolumeRequest{ - PluginName: fakePlugin.Name, - VolumeID: "1234-4321-1234-4321", - NodeID: "abcde", - AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter, - AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem, + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: fakePlugin.Name, + }, + VolumeID: "1234-4321-1234-4321", + ClientCSINodeID: "abcde", + AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem, }, ExpectedErr: errors.New("hello"), }, @@ -91,11 +103,13 @@ func TestCSIController_AttachVolume(t *testing.T) { fc.NextControllerPublishVolumeResponse = &csi.ControllerPublishVolumeResponse{} }, Request: &structs.ClientCSIControllerAttachVolumeRequest{ - PluginName: fakePlugin.Name, - VolumeID: "1234-4321-1234-4321", - NodeID: "abcde", - AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter, - AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem, + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: fakePlugin.Name, + }, + VolumeID: "1234-4321-1234-4321", + ClientCSINodeID: "abcde", + AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem, }, ExpectedResponse: &structs.ClientCSIControllerAttachVolumeResponse{}, }, @@ -107,11 +121,13 @@ func TestCSIController_AttachVolume(t *testing.T) { } }, Request: &structs.ClientCSIControllerAttachVolumeRequest{ - PluginName: fakePlugin.Name, - VolumeID: "1234-4321-1234-4321", - NodeID: "abcde", - AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter, - AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem, + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: fakePlugin.Name, + }, + VolumeID: "1234-4321-1234-4321", + ClientCSINodeID: "abcde", + AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem, }, ExpectedResponse: &structs.ClientCSIControllerAttachVolumeResponse{ PublishContext: map[string]string{"foo": "bar"}, @@ -161,14 +177,18 @@ func TestCSIController_ValidateVolume(t *testing.T) { { Name: "validates volumeid is not empty", Request: &structs.ClientCSIControllerValidateVolumeRequest{ - PluginID: fakePlugin.Name, + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: fakePlugin.Name, + }, }, ExpectedErr: errors.New("VolumeID is required"), }, { Name: "returns plugin not found errors", Request: &structs.ClientCSIControllerValidateVolumeRequest{ - PluginID: "some-garbage", + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: "some-garbage", + }, VolumeID: "foo", }, ExpectedErr: errors.New("plugin some-garbage for type csi-controller not found"), @@ -176,7 +196,9 @@ func TestCSIController_ValidateVolume(t *testing.T) { { Name: "validates attachmentmode", Request: &structs.ClientCSIControllerValidateVolumeRequest{ - PluginID: fakePlugin.Name, + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: fakePlugin.Name, + }, VolumeID: "1234-4321-1234-4321", AttachmentMode: nstructs.CSIVolumeAttachmentMode("bar"), AccessMode: nstructs.CSIVolumeAccessModeMultiNodeReader, @@ -186,7 +208,9 @@ func TestCSIController_ValidateVolume(t *testing.T) { { Name: "validates AccessMode", Request: &structs.ClientCSIControllerValidateVolumeRequest{ - PluginID: fakePlugin.Name, + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: fakePlugin.Name, + }, VolumeID: "1234-4321-1234-4321", AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem, AccessMode: nstructs.CSIVolumeAccessMode("foo"), @@ -199,7 +223,9 @@ func TestCSIController_ValidateVolume(t *testing.T) { fc.NextControllerValidateVolumeErr = errors.New("hello") }, Request: &structs.ClientCSIControllerValidateVolumeRequest{ - PluginID: fakePlugin.Name, + CSIControllerQuery: structs.CSIControllerQuery{ + PluginID: fakePlugin.Name, + }, VolumeID: "1234-4321-1234-4321", AccessMode: nstructs.CSIVolumeAccessModeSingleNodeWriter, AttachmentMode: nstructs.CSIVolumeAttachmentModeFilesystem, diff --git a/client/structs/csi.go b/client/structs/csi.go index 6b9f891be..d0e96a588 100644 --- a/client/structs/csi.go +++ b/client/structs/csi.go @@ -20,27 +20,36 @@ type CSIVolumeMountOptions struct { MountFlags []string } -type ClientCSIControllerValidateVolumeRequest struct { +// CSIControllerQuery is used to specify various flags for queries against CSI +// Controllers +type CSIControllerQuery struct { + // ControllerNodeID is the node that should be targeted by the request + ControllerNodeID string + + // PluginID is the plugin that should be targeted on the given node. PluginID string +} + +type ClientCSIControllerValidateVolumeRequest struct { VolumeID string AttachmentMode structs.CSIVolumeAttachmentMode AccessMode structs.CSIVolumeAccessMode + + CSIControllerQuery } type ClientCSIControllerValidateVolumeResponse struct { } type ClientCSIControllerAttachVolumeRequest struct { - PluginName string - // The ID of the volume to be used on a node. // This field is REQUIRED. VolumeID string // The ID of the node. This field is REQUIRED. This must match the NodeID that // is fingerprinted by the target node for this plugin name. - NodeID string + ClientCSINodeID string // AttachmentMode indicates how the volume should be attached and mounted into // a task. @@ -56,6 +65,8 @@ type ClientCSIControllerAttachVolumeRequest struct { // ReadOnly indicates that the volume will be used in a readonly fashion. This // only works when the Controller has the PublishReadonly capability. ReadOnly bool + + CSIControllerQuery } func (c *ClientCSIControllerAttachVolumeRequest) ToCSIRequest() *csi.ControllerPublishVolumeRequest { @@ -65,7 +76,7 @@ func (c *ClientCSIControllerAttachVolumeRequest) ToCSIRequest() *csi.ControllerP return &csi.ControllerPublishVolumeRequest{ VolumeID: c.VolumeID, - NodeID: c.NodeID, + NodeID: c.ClientCSINodeID, ReadOnly: c.ReadOnly, } } @@ -88,15 +99,16 @@ type ClientCSIControllerAttachVolumeResponse struct { } type ClientCSIControllerDetachVolumeRequest struct { - PluginName string - // The ID of the volume to be unpublished for the node // This field is REQUIRED. VolumeID string - // The ID of the node. This field is REQUIRED. This must match the NodeID that - // is fingerprinted by the target node for this plugin name. - NodeID string + // The CSI Node ID for the Node that the volume should be detached from. + // This field is REQUIRED. This must match the NodeID that is fingerprinted + // by the target node for this plugin name. + ClientCSINodeID string + + CSIControllerQuery } func (c *ClientCSIControllerDetachVolumeRequest) ToCSIRequest() *csi.ControllerUnpublishVolumeRequest { @@ -106,7 +118,7 @@ func (c *ClientCSIControllerDetachVolumeRequest) ToCSIRequest() *csi.ControllerU return &csi.ControllerUnpublishVolumeRequest{ VolumeID: c.VolumeID, - NodeID: c.NodeID, + NodeID: c.ClientCSINodeID, } } diff --git a/nomad/client_csi_endpoint.go b/nomad/client_csi_endpoint.go new file mode 100644 index 000000000..061e6493d --- /dev/null +++ b/nomad/client_csi_endpoint.go @@ -0,0 +1,75 @@ +package nomad + +import ( + "errors" + "time" + + metrics "github.com/armon/go-metrics" + log "github.com/hashicorp/go-hclog" + cstructs "github.com/hashicorp/nomad/client/structs" +) + +// ClientCSIController is used to forward RPC requests to the targed Nomad client's +// CSIController endpoint. +type ClientCSIController struct { + srv *Server + logger log.Logger +} + +func (a *ClientCSIController) AttachVolume(args *cstructs.ClientCSIControllerAttachVolumeRequest, reply *cstructs.ClientCSIControllerAttachVolumeResponse) error { + defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "attach_volume"}, time.Now()) + + // Verify the arguments. + if args.ControllerNodeID == "" { + return errors.New("missing ControllerNodeID") + } + + // Make sure Node is valid and new enough to support RPC + snap, err := a.srv.State().Snapshot() + if err != nil { + return err + } + + _, err = getNodeForRpc(snap, args.ControllerNodeID) + if err != nil { + return err + } + + // Get the connection to the client + state, ok := a.srv.getNodeConn(args.ControllerNodeID) + if !ok { + return findNodeConnAndForward(a.srv, args.ControllerNodeID, "ClientCSIController.AttachVolume", args, reply) + } + + // Make the RPC + return NodeRpc(state.Session, "CSIController.AttachVolume", args, reply) +} + +func (a *ClientCSIController) ValidateVolume(args *cstructs.ClientCSIControllerValidateVolumeRequest, reply *cstructs.ClientCSIControllerValidateVolumeResponse) error { + defer metrics.MeasureSince([]string{"nomad", "client_csi_controller", "validate_volume"}, time.Now()) + + // Verify the arguments. + if args.ControllerNodeID == "" { + return errors.New("missing ControllerNodeID") + } + + // Make sure Node is valid and new enough to support RPC + snap, err := a.srv.State().Snapshot() + if err != nil { + return err + } + + _, err = getNodeForRpc(snap, args.ControllerNodeID) + if err != nil { + return err + } + + // Get the connection to the client + state, ok := a.srv.getNodeConn(args.ControllerNodeID) + if !ok { + return findNodeConnAndForward(a.srv, args.ControllerNodeID, "ClientCSIController.ValidateVolume", args, reply) + } + + // Make the RPC + return NodeRpc(state.Session, "CSIController.ValidateVolume", args, reply) +} diff --git a/nomad/client_csi_endpoint_test.go b/nomad/client_csi_endpoint_test.go new file mode 100644 index 000000000..48774fc12 --- /dev/null +++ b/nomad/client_csi_endpoint_test.go @@ -0,0 +1,93 @@ +package nomad + +import ( + "testing" + + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/hashicorp/nomad/client" + "github.com/hashicorp/nomad/client/config" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" + "github.com/stretchr/testify/require" +) + +func TestClientCSIController_AttachVolume_Local(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and client + s, cleanupS := TestServer(t, nil) + defer cleanupS() + codec := rpcClient(t, s) + testutil.WaitForLeader(t, s.RPC) + + c, cleanupC := client.TestClient(t, func(c *config.Config) { + c.Servers = []string{s.config.RPCAddr.String()} + }) + defer cleanupC() + + testutil.WaitForResult(func() (bool, error) { + nodes := s.connectedNodes() + return len(nodes) == 1, nil + }, func(err error) { + require.Fail("should have a client") + }) + + req := &cstructs.ClientCSIControllerAttachVolumeRequest{ + CSIControllerQuery: cstructs.CSIControllerQuery{ControllerNodeID: c.NodeID()}, + } + + // Fetch the response + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "ClientCSIController.AttachVolume", req, &resp) + require.NotNil(err) + // Should recieve an error from the client endpoint + require.Contains(err.Error(), "must specify plugin name to dispense") +} + +func TestClientCSIController_AttachVolume_Forwarded(t *testing.T) { + t.Parallel() + require := require.New(t) + + // Start a server and client + s1, cleanupS1 := TestServer(t, nil) + defer cleanupS1() + s2, cleanupS2 := TestServer(t, func(c *Config) { + c.DevDisableBootstrap = true + }) + defer cleanupS2() + TestJoin(t, s1, s2) + testutil.WaitForLeader(t, s1.RPC) + testutil.WaitForLeader(t, s2.RPC) + codec := rpcClient(t, s2) + + c, cleanupC := client.TestClient(t, func(c *config.Config) { + c.Servers = []string{s2.config.RPCAddr.String()} + c.GCDiskUsageThreshold = 100.0 + }) + defer cleanupC() + + testutil.WaitForResult(func() (bool, error) { + nodes := s2.connectedNodes() + return len(nodes) == 1, nil + }, func(err error) { + require.Fail("should have a client") + }) + + // Force remove the connection locally in case it exists + s1.nodeConnsLock.Lock() + delete(s1.nodeConns, c.NodeID()) + s1.nodeConnsLock.Unlock() + + req := &cstructs.ClientCSIControllerAttachVolumeRequest{ + CSIControllerQuery: cstructs.CSIControllerQuery{ControllerNodeID: c.NodeID()}, + } + + // Fetch the response + var resp structs.GenericResponse + err := msgpackrpc.CallWithCodec(codec, "ClientCSIController.AttachVolume", req, &resp) + require.NotNil(err) + // Should recieve an error from the client endpoint + require.Contains(err.Error(), "must specify plugin name to dispense") +} diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 4d7b9919a..91aa6a943 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -222,16 +222,26 @@ func (srv *Server) controllerValidateVolume(req *structs.CSIVolumeRegisterReques // The plugin requires a controller. Now we do some validation of the Volume // to ensure that the registered capabilities are valid and that the volume // exists. - method := "ClientCSI.CSIControllerValidateVolume" + + // plugin IDs are not scoped to region/DC but volumes are. + // so any node we get for a controller is already in the same region/DC + // for the volume. + nodeID, err := srv.nodeForControllerPlugin(plugin) + if err != nil || nodeID == "" { + return err + } + + method := "ClientCSIController.ValidateVolume" cReq := &cstructs.ClientCSIControllerValidateVolumeRequest{ - PluginID: plugin.ID, VolumeID: vol.ID, AttachmentMode: vol.AttachmentMode, AccessMode: vol.AccessMode, } + cReq.PluginID = plugin.ID + cReq.ControllerNodeID = nodeID cResp := &cstructs.ClientCSIControllerValidateVolumeResponse{} - return srv.csiControllerRPC(plugin, method, cReq, cResp) + return srv.RPC(method, cReq, cResp) } // Register registers a new volume @@ -483,25 +493,42 @@ func (srv *Server) controllerPublishVolume(req *structs.CSIVolumeClaimRequest, r return nil } - method := "ClientCSI.CSIControllerAttachVolume" + // plugin IDs are not scoped to region/DC but volumes are. + // so any node we get for a controller is already in the same region/DC + // for the volume. + nodeID, err := srv.nodeForControllerPlugin(plug) + if err != nil || nodeID == "" { + return err + } + + targetNode, err := state.NodeByID(ws, alloc.NodeID) + if err != nil { + return err + } + if targetNode == nil { + return fmt.Errorf("%s: %s", structs.ErrUnknownNodePrefix, alloc.NodeID) + } + targetCSIInfo, ok := targetNode.CSINodePlugins[plug.ID] + if !ok { + return fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID) + } + + method := "ClientCSIController.AttachVolume" cReq := &cstructs.ClientCSIControllerAttachVolumeRequest{ - PluginName: plug.ID, - VolumeID: req.VolumeID, - NodeID: alloc.NodeID, - AttachmentMode: vol.AttachmentMode, - AccessMode: vol.AccessMode, - ReadOnly: req.Claim == structs.CSIVolumeClaimRead, + VolumeID: req.VolumeID, + ClientCSINodeID: targetCSIInfo.NodeInfo.ID, + AttachmentMode: vol.AttachmentMode, + AccessMode: vol.AccessMode, + ReadOnly: req.Claim == structs.CSIVolumeClaimRead, // TODO(tgross): we don't have a way of setting these yet. // ref https://github.com/hashicorp/nomad/issues/7007 // MountOptions: vol.MountOptions, } + cReq.PluginID = plug.ID + cReq.ControllerNodeID = nodeID cResp := &cstructs.ClientCSIControllerAttachVolumeResponse{} - // CSI controller plugins can block for arbitrarily long times, - // but we need to make sure it completes before we can safely - // mark the volume as claimed and return to the client so it - // can do a `NodePublish`. - err = srv.csiControllerRPC(plug, method, cReq, cResp) + err = srv.RPC(method, cReq, cResp) if err != nil { return err } @@ -512,24 +539,43 @@ func (srv *Server) controllerPublishVolume(req *structs.CSIVolumeClaimRequest, r // controllerUnpublishVolume sends an unpublish request to the CSI // controller plugin associated with a volume, if any. // TODO: the only caller of this won't have an alloc pointer handy, should it be its own request arg type? -func (srv *Server) controllerUnpublishVolume(req *structs.CSIVolumeClaimRequest, nodeID string) error { +func (srv *Server) controllerUnpublishVolume(req *structs.CSIVolumeClaimRequest, targetNomadNodeID string) error { plug, vol, err := srv.volAndPluginLookup(req.VolumeID) if plug == nil || vol == nil || err != nil { return err // possibly nil if no controller required } - method := "ClientCSI.DetachVolume" - cReq := &cstructs.ClientCSIControllerDetachVolumeRequest{ - PluginName: plug.ID, - VolumeID: req.VolumeID, - NodeID: nodeID, - } - err = srv.csiControllerRPC(plug, method, cReq, - &cstructs.ClientCSIControllerDetachVolumeResponse{}) + ws := memdb.NewWatchSet() + state := srv.State() + + targetNode, err := state.NodeByID(ws, targetNomadNodeID) if err != nil { return err } - return nil + if targetNode == nil { + return fmt.Errorf("%s: %s", structs.ErrUnknownNodePrefix, targetNomadNodeID) + } + targetCSIInfo, ok := targetNode.CSINodePlugins[plug.ID] + if !ok { + return fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID) + } + + // plugin IDs are not scoped to region/DC but volumes are. + // so any node we get for a controller is already in the same region/DC + // for the volume. + nodeID, err := srv.nodeForControllerPlugin(plug) + if err != nil || nodeID == "" { + return err + } + + method := "ClientCSIController.DetachVolume" + cReq := &cstructs.ClientCSIControllerDetachVolumeRequest{ + VolumeID: req.VolumeID, + ClientCSINodeID: targetCSIInfo.NodeInfo.ID, + } + cReq.PluginID = plug.ID + cReq.ControllerNodeID = nodeID + return srv.RPC(method, cReq, &cstructs.ClientCSIControllerDetachVolumeResponse{}) } func (srv *Server) volAndPluginLookup(volID string) (*structs.CSIPlugin, *structs.CSIVolume, error) { @@ -560,24 +606,6 @@ func (srv *Server) volAndPluginLookup(volID string) (*structs.CSIPlugin, *struct return plug, vol, nil } -func (srv *Server) csiControllerRPC(plugin *structs.CSIPlugin, method string, args, reply interface{}) error { - // plugin IDs are not scoped to region/DC but volumes are. - // so any node we get for a controller is already in the same region/DC - // for the volume. - nodeID, err := srv.nodeForControllerPlugin(plugin) - if err != nil || nodeID == "" { - return err - } - err = findNodeConnAndForward(srv, nodeID, method, args, reply) - if err != nil { - return err - } - if replyErr, ok := reply.(error); ok { - return replyErr - } - return nil -} - // nodeForControllerPlugin returns the node ID for a random controller // to load-balance long-blocking RPCs across client nodes. func (srv *Server) nodeForControllerPlugin(plugin *structs.CSIPlugin) (string, error) { diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index 8c1905084..a3320c836 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -336,6 +336,14 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) { RequiresControllerPlugin: true, }, } + node.CSINodePlugins = map[string]*structs.CSIInfo{ + "minnie": {PluginID: "minnie", + Healthy: true, + ControllerInfo: &structs.CSIControllerInfo{}, + NodeInfo: &structs.CSINodeInfo{}, + RequiresControllerPlugin: true, + }, + } err := state.UpsertNode(1002, node) require.NoError(t, err) vols := []*structs.CSIVolume{{ @@ -367,6 +375,7 @@ func TestCSIVolumeEndpoint_ClaimWithController(t *testing.T) { } claimResp := &structs.CSIVolumeClaimResponse{} err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", claimReq, claimResp) + // Because the node is not registered require.EqualError(t, err, "No path to node") } diff --git a/nomad/server.go b/nomad/server.go index 41ffe8cbe..921d9901b 100644 --- a/nomad/server.go +++ b/nomad/server.go @@ -261,10 +261,11 @@ type endpoints struct { Enterprise *EnterpriseEndpoints // Client endpoints - ClientStats *ClientStats - FileSystem *FileSystem - Agent *Agent - ClientAllocations *ClientAllocations + ClientStats *ClientStats + FileSystem *FileSystem + Agent *Agent + ClientAllocations *ClientAllocations + ClientCSIController *ClientCSIController } // NewServer is used to construct a new Nomad server from the @@ -1110,6 +1111,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { s.staticEndpoints.ClientStats = &ClientStats{srv: s, logger: s.logger.Named("client_stats")} s.staticEndpoints.ClientAllocations = &ClientAllocations{srv: s, logger: s.logger.Named("client_allocs")} s.staticEndpoints.ClientAllocations.register() + s.staticEndpoints.ClientCSIController = &ClientCSIController{srv: s, logger: s.logger.Named("client_csi")} // Streaming endpoints s.staticEndpoints.FileSystem = &FileSystem{srv: s, logger: s.logger.Named("client_fs")} @@ -1137,6 +1139,7 @@ func (s *Server) setupRpcServer(server *rpc.Server, ctx *RPCContext) { s.staticEndpoints.Enterprise.Register(server) server.Register(s.staticEndpoints.ClientStats) server.Register(s.staticEndpoints.ClientAllocations) + server.Register(s.staticEndpoints.ClientCSIController) server.Register(s.staticEndpoints.FileSystem) server.Register(s.staticEndpoints.Agent)