mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 19:05:42 +03:00
csi: Support for NodeUnpublishVolume RPCs
This commit is contained in:
committed by
Tim Gross
parent
a84685bf11
commit
77f3ef345b
@@ -70,6 +70,7 @@ type CSINodeClient interface {
|
||||
NodeStageVolume(ctx context.Context, in *csipbv1.NodeStageVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeStageVolumeResponse, error)
|
||||
NodeUnstageVolume(ctx context.Context, in *csipbv1.NodeUnstageVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnstageVolumeResponse, error)
|
||||
NodePublishVolume(ctx context.Context, in *csipbv1.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodePublishVolumeResponse, error)
|
||||
NodeUnpublishVolume(ctx context.Context, in *csipbv1.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnpublishVolumeResponse, error)
|
||||
}
|
||||
|
||||
type client struct {
|
||||
@@ -372,3 +373,30 @@ func (c *client) NodePublishVolume(ctx context.Context, req *NodePublishVolumeRe
|
||||
_, err := c.nodeClient.NodePublishVolume(ctx, req.ToCSIRepresentation())
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath string) error {
|
||||
if c == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
}
|
||||
if c.nodeClient == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
}
|
||||
|
||||
if volumeID == "" {
|
||||
return fmt.Errorf("missing VolumeID")
|
||||
}
|
||||
|
||||
if targetPath == "" {
|
||||
return fmt.Errorf("missing TargetPath")
|
||||
}
|
||||
|
||||
req := &csipbv1.NodeUnpublishVolumeRequest{
|
||||
VolumeId: volumeID,
|
||||
TargetPath: targetPath,
|
||||
}
|
||||
|
||||
// NodeUnpublishVolume's response contains no extra data. If err == nil, we were
|
||||
// successful.
|
||||
_, err := c.nodeClient.NodeUnpublishVolume(ctx, req)
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -539,3 +539,56 @@ func TestClient_RPC_NodePublishVolume(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
func TestClient_RPC_NodeUnpublishVolume(t *testing.T) {
|
||||
cases := []struct {
|
||||
Name string
|
||||
VolumeID string
|
||||
TargetPath string
|
||||
ResponseErr error
|
||||
Response *csipbv1.NodeUnpublishVolumeResponse
|
||||
ExpectedErr error
|
||||
}{
|
||||
{
|
||||
Name: "handles underlying grpc errors",
|
||||
VolumeID: "foo",
|
||||
TargetPath: "/dev/null",
|
||||
ResponseErr: fmt.Errorf("some grpc error"),
|
||||
ExpectedErr: fmt.Errorf("some grpc error"),
|
||||
},
|
||||
{
|
||||
Name: "handles success",
|
||||
VolumeID: "foo",
|
||||
TargetPath: "/dev/null",
|
||||
ResponseErr: nil,
|
||||
ExpectedErr: nil,
|
||||
},
|
||||
{
|
||||
Name: "Performs validation of the request args - VolumeID",
|
||||
ResponseErr: nil,
|
||||
ExpectedErr: errors.New("missing VolumeID"),
|
||||
},
|
||||
{
|
||||
Name: "Performs validation of the request args - TargetPath",
|
||||
VolumeID: "foo",
|
||||
ResponseErr: nil,
|
||||
ExpectedErr: errors.New("missing TargetPath"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.Name, func(t *testing.T) {
|
||||
_, _, nc, client := newTestClient()
|
||||
defer client.Close()
|
||||
|
||||
nc.NextErr = c.ResponseErr
|
||||
nc.NextUnpublishVolumeResponse = c.Response
|
||||
|
||||
err := client.NodeUnpublishVolume(context.TODO(), c.VolumeID, c.TargetPath)
|
||||
if c.ExpectedErr != nil {
|
||||
require.Error(t, c.ExpectedErr, err)
|
||||
} else {
|
||||
require.Nil(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -59,6 +59,9 @@ type Client struct {
|
||||
|
||||
NextNodePublishVolumeErr error
|
||||
NodePublishVolumeCallCount int64
|
||||
|
||||
NextNodeUnpublishVolumeErr error
|
||||
NodeUnpublishVolumeCallCount int64
|
||||
}
|
||||
|
||||
// PluginInfo describes the type and version of a plugin.
|
||||
@@ -190,6 +193,15 @@ func (c *Client) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu
|
||||
return c.NextNodePublishVolumeErr
|
||||
}
|
||||
|
||||
func (c *Client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath string) error {
|
||||
c.Mu.Lock()
|
||||
defer c.Mu.Unlock()
|
||||
|
||||
c.NodeUnpublishVolumeCallCount++
|
||||
|
||||
return c.NextNodeUnpublishVolumeErr
|
||||
}
|
||||
|
||||
// Shutdown the client and ensure any connections are cleaned up.
|
||||
func (c *Client) Close() error {
|
||||
return nil
|
||||
|
||||
@@ -59,6 +59,11 @@ type CSIPlugin interface {
|
||||
// if err == nil the response should be assumed to be successful.
|
||||
NodePublishVolume(ctx context.Context, req *NodePublishVolumeRequest) error
|
||||
|
||||
// NodeUnpublishVolume is used to cleanup usage of a volume for an alloc. This
|
||||
// MUST be called before calling NodeUnstageVolume or ControllerUnpublishVolume
|
||||
// for the given volume.
|
||||
NodeUnpublishVolume(ctx context.Context, volumeID, targetPath string) error
|
||||
|
||||
// Shutdown the client and ensure any connections are cleaned up.
|
||||
Close() error
|
||||
}
|
||||
|
||||
@@ -80,12 +80,13 @@ func (c *ControllerClient) ValidateVolumeCapabilities(ctx context.Context, in *c
|
||||
|
||||
// NodeClient is a CSI Node client used for testing
|
||||
type NodeClient struct {
|
||||
NextErr error
|
||||
NextCapabilitiesResponse *csipbv1.NodeGetCapabilitiesResponse
|
||||
NextGetInfoResponse *csipbv1.NodeGetInfoResponse
|
||||
NextStageVolumeResponse *csipbv1.NodeStageVolumeResponse
|
||||
NextUnstageVolumeResponse *csipbv1.NodeUnstageVolumeResponse
|
||||
NextPublishVolumeResponse *csipbv1.NodePublishVolumeResponse
|
||||
NextErr error
|
||||
NextCapabilitiesResponse *csipbv1.NodeGetCapabilitiesResponse
|
||||
NextGetInfoResponse *csipbv1.NodeGetInfoResponse
|
||||
NextStageVolumeResponse *csipbv1.NodeStageVolumeResponse
|
||||
NextUnstageVolumeResponse *csipbv1.NodeUnstageVolumeResponse
|
||||
NextPublishVolumeResponse *csipbv1.NodePublishVolumeResponse
|
||||
NextUnpublishVolumeResponse *csipbv1.NodeUnpublishVolumeResponse
|
||||
}
|
||||
|
||||
// NewNodeClient returns a new stub NodeClient
|
||||
@@ -100,6 +101,7 @@ func (f *NodeClient) Reset() {
|
||||
f.NextStageVolumeResponse = nil
|
||||
f.NextUnstageVolumeResponse = nil
|
||||
f.NextPublishVolumeResponse = nil
|
||||
f.NextUnpublishVolumeResponse = nil
|
||||
}
|
||||
|
||||
func (c *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipbv1.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipbv1.NodeGetCapabilitiesResponse, error) {
|
||||
@@ -121,3 +123,7 @@ func (c *NodeClient) NodeUnstageVolume(ctx context.Context, in *csipbv1.NodeUnst
|
||||
func (c *NodeClient) NodePublishVolume(ctx context.Context, in *csipbv1.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodePublishVolumeResponse, error) {
|
||||
return c.NextPublishVolumeResponse, c.NextErr
|
||||
}
|
||||
|
||||
func (c *NodeClient) NodeUnpublishVolume(ctx context.Context, in *csipbv1.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnpublishVolumeResponse, error) {
|
||||
return c.NextUnpublishVolumeResponse, c.NextErr
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user