From 77f3ef345be36e84728653a4d20d9466c07ca47b Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Mon, 27 Jan 2020 11:17:10 +0100 Subject: [PATCH] csi: Support for NodeUnpublishVolume RPCs --- plugins/csi/client.go | 28 ++++++++++++++++++ plugins/csi/client_test.go | 53 +++++++++++++++++++++++++++++++++++ plugins/csi/fake/client.go | 12 ++++++++ plugins/csi/plugin.go | 5 ++++ plugins/csi/testing/client.go | 18 ++++++++---- 5 files changed, 110 insertions(+), 6 deletions(-) diff --git a/plugins/csi/client.go b/plugins/csi/client.go index 80685b9fb..0766c8e9a 100644 --- a/plugins/csi/client.go +++ b/plugins/csi/client.go @@ -70,6 +70,7 @@ type CSINodeClient interface { NodeStageVolume(ctx context.Context, in *csipbv1.NodeStageVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeStageVolumeResponse, error) NodeUnstageVolume(ctx context.Context, in *csipbv1.NodeUnstageVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnstageVolumeResponse, error) NodePublishVolume(ctx context.Context, in *csipbv1.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodePublishVolumeResponse, error) + NodeUnpublishVolume(ctx context.Context, in *csipbv1.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnpublishVolumeResponse, error) } type client struct { @@ -372,3 +373,30 @@ func (c *client) NodePublishVolume(ctx context.Context, req *NodePublishVolumeRe _, err := c.nodeClient.NodePublishVolume(ctx, req.ToCSIRepresentation()) return err } + +func (c *client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath string) error { + if c == nil { + return fmt.Errorf("Client not initialized") + } + if c.nodeClient == nil { + return fmt.Errorf("Client not initialized") + } + + if volumeID == "" { + return fmt.Errorf("missing VolumeID") + } + + if targetPath == "" { + return fmt.Errorf("missing TargetPath") + } + + req := &csipbv1.NodeUnpublishVolumeRequest{ + VolumeId: volumeID, + TargetPath: targetPath, + } + + // NodeUnpublishVolume's response contains no extra data. If err == nil, we were + // successful. + _, err := c.nodeClient.NodeUnpublishVolume(ctx, req) + return err +} diff --git a/plugins/csi/client_test.go b/plugins/csi/client_test.go index 23da49b77..d458c58c5 100644 --- a/plugins/csi/client_test.go +++ b/plugins/csi/client_test.go @@ -539,3 +539,56 @@ func TestClient_RPC_NodePublishVolume(t *testing.T) { }) } } +func TestClient_RPC_NodeUnpublishVolume(t *testing.T) { + cases := []struct { + Name string + VolumeID string + TargetPath string + ResponseErr error + Response *csipbv1.NodeUnpublishVolumeResponse + ExpectedErr error + }{ + { + Name: "handles underlying grpc errors", + VolumeID: "foo", + TargetPath: "/dev/null", + ResponseErr: fmt.Errorf("some grpc error"), + ExpectedErr: fmt.Errorf("some grpc error"), + }, + { + Name: "handles success", + VolumeID: "foo", + TargetPath: "/dev/null", + ResponseErr: nil, + ExpectedErr: nil, + }, + { + Name: "Performs validation of the request args - VolumeID", + ResponseErr: nil, + ExpectedErr: errors.New("missing VolumeID"), + }, + { + Name: "Performs validation of the request args - TargetPath", + VolumeID: "foo", + ResponseErr: nil, + ExpectedErr: errors.New("missing TargetPath"), + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + _, _, nc, client := newTestClient() + defer client.Close() + + nc.NextErr = c.ResponseErr + nc.NextUnpublishVolumeResponse = c.Response + + err := client.NodeUnpublishVolume(context.TODO(), c.VolumeID, c.TargetPath) + if c.ExpectedErr != nil { + require.Error(t, c.ExpectedErr, err) + } else { + require.Nil(t, err) + } + }) + } +} diff --git a/plugins/csi/fake/client.go b/plugins/csi/fake/client.go index 808e7bb37..c8e0cfd55 100644 --- a/plugins/csi/fake/client.go +++ b/plugins/csi/fake/client.go @@ -59,6 +59,9 @@ type Client struct { NextNodePublishVolumeErr error NodePublishVolumeCallCount int64 + + NextNodeUnpublishVolumeErr error + NodeUnpublishVolumeCallCount int64 } // PluginInfo describes the type and version of a plugin. @@ -190,6 +193,15 @@ func (c *Client) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolu return c.NextNodePublishVolumeErr } +func (c *Client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath string) error { + c.Mu.Lock() + defer c.Mu.Unlock() + + c.NodeUnpublishVolumeCallCount++ + + return c.NextNodeUnpublishVolumeErr +} + // Shutdown the client and ensure any connections are cleaned up. func (c *Client) Close() error { return nil diff --git a/plugins/csi/plugin.go b/plugins/csi/plugin.go index cd5ad7555..172aa01d7 100644 --- a/plugins/csi/plugin.go +++ b/plugins/csi/plugin.go @@ -59,6 +59,11 @@ type CSIPlugin interface { // if err == nil the response should be assumed to be successful. NodePublishVolume(ctx context.Context, req *NodePublishVolumeRequest) error + // NodeUnpublishVolume is used to cleanup usage of a volume for an alloc. This + // MUST be called before calling NodeUnstageVolume or ControllerUnpublishVolume + // for the given volume. + NodeUnpublishVolume(ctx context.Context, volumeID, targetPath string) error + // Shutdown the client and ensure any connections are cleaned up. Close() error } diff --git a/plugins/csi/testing/client.go b/plugins/csi/testing/client.go index f1d22f19d..f28d9287e 100644 --- a/plugins/csi/testing/client.go +++ b/plugins/csi/testing/client.go @@ -80,12 +80,13 @@ func (c *ControllerClient) ValidateVolumeCapabilities(ctx context.Context, in *c // NodeClient is a CSI Node client used for testing type NodeClient struct { - NextErr error - NextCapabilitiesResponse *csipbv1.NodeGetCapabilitiesResponse - NextGetInfoResponse *csipbv1.NodeGetInfoResponse - NextStageVolumeResponse *csipbv1.NodeStageVolumeResponse - NextUnstageVolumeResponse *csipbv1.NodeUnstageVolumeResponse - NextPublishVolumeResponse *csipbv1.NodePublishVolumeResponse + NextErr error + NextCapabilitiesResponse *csipbv1.NodeGetCapabilitiesResponse + NextGetInfoResponse *csipbv1.NodeGetInfoResponse + NextStageVolumeResponse *csipbv1.NodeStageVolumeResponse + NextUnstageVolumeResponse *csipbv1.NodeUnstageVolumeResponse + NextPublishVolumeResponse *csipbv1.NodePublishVolumeResponse + NextUnpublishVolumeResponse *csipbv1.NodeUnpublishVolumeResponse } // NewNodeClient returns a new stub NodeClient @@ -100,6 +101,7 @@ func (f *NodeClient) Reset() { f.NextStageVolumeResponse = nil f.NextUnstageVolumeResponse = nil f.NextPublishVolumeResponse = nil + f.NextUnpublishVolumeResponse = nil } func (c *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipbv1.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipbv1.NodeGetCapabilitiesResponse, error) { @@ -121,3 +123,7 @@ func (c *NodeClient) NodeUnstageVolume(ctx context.Context, in *csipbv1.NodeUnst func (c *NodeClient) NodePublishVolume(ctx context.Context, in *csipbv1.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodePublishVolumeResponse, error) { return c.NextPublishVolumeResponse, c.NextErr } + +func (c *NodeClient) NodeUnpublishVolume(ctx context.Context, in *csipbv1.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnpublishVolumeResponse, error) { + return c.NextUnpublishVolumeResponse, c.NextErr +}