From 9f73f09f571e0ddfaa7b537eab438962f0191043 Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Fri, 24 Jan 2020 17:20:23 +0100 Subject: [PATCH] csi: Add NodePublishVolume RPCs --- plugins/csi/client.go | 20 +++++++++- plugins/csi/client_test.go | 57 ++++++++++++++++++++++++++++ plugins/csi/fake/client.go | 12 ++++++ plugins/csi/plugin.go | 70 +++++++++++++++++++++++++++++++++++ plugins/csi/testing/client.go | 8 +++- 5 files changed, 165 insertions(+), 2 deletions(-) diff --git a/plugins/csi/client.go b/plugins/csi/client.go index b3442d7d4..80685b9fb 100644 --- a/plugins/csi/client.go +++ b/plugins/csi/client.go @@ -69,6 +69,7 @@ type CSINodeClient interface { NodeGetInfo(ctx context.Context, in *csipbv1.NodeGetInfoRequest, opts ...grpc.CallOption) (*csipbv1.NodeGetInfoResponse, error) NodeStageVolume(ctx context.Context, in *csipbv1.NodeStageVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeStageVolumeResponse, error) NodeUnstageVolume(ctx context.Context, in *csipbv1.NodeUnstageVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnstageVolumeResponse, error) + NodePublishVolume(ctx context.Context, in *csipbv1.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodePublishVolumeResponse, error) } type client struct { @@ -334,7 +335,6 @@ func (c *client) NodeUnstageVolume(ctx context.Context, volumeID string, staging if c.nodeClient == nil { return fmt.Errorf("Client not initialized") } - // These errors should not be returned during production use but exist as aids // during Nomad Development if volumeID == "" { @@ -354,3 +354,21 @@ func (c *client) NodeUnstageVolume(ctx context.Context, volumeID string, staging _, err := c.nodeClient.NodeUnstageVolume(ctx, req) return err } + +func (c *client) NodePublishVolume(ctx context.Context, req *NodePublishVolumeRequest) error { + if c == nil { + return fmt.Errorf("Client not initialized") + } + if c.nodeClient == nil { + return fmt.Errorf("Client not initialized") + } + + if err := req.Validate(); err != nil { + return fmt.Errorf("validation error: %v", err) + } + + // NodePublishVolume's response contains no extra data. If err == nil, we were + // successful. + _, err := c.nodeClient.NodePublishVolume(ctx, req.ToCSIRepresentation()) + return err +} diff --git a/plugins/csi/client_test.go b/plugins/csi/client_test.go index 237f362cc..23da49b77 100644 --- a/plugins/csi/client_test.go +++ b/plugins/csi/client_test.go @@ -2,6 +2,7 @@ package csi import ( "context" + "errors" "fmt" "testing" @@ -482,3 +483,59 @@ func TestClient_RPC_NodeUnstageVolume(t *testing.T) { }) } } + +func TestClient_RPC_NodePublishVolume(t *testing.T) { + cases := []struct { + Name string + Request *NodePublishVolumeRequest + ResponseErr error + Response *csipbv1.NodePublishVolumeResponse + ExpectedErr error + }{ + { + Name: "handles underlying grpc errors", + Request: &NodePublishVolumeRequest{ + VolumeID: "foo", + TargetPath: "/dev/null", + VolumeCapability: &VolumeCapability{}, + }, + ResponseErr: fmt.Errorf("some grpc error"), + ExpectedErr: fmt.Errorf("some grpc error"), + }, + { + Name: "handles success", + Request: &NodePublishVolumeRequest{ + VolumeID: "foo", + TargetPath: "/dev/null", + VolumeCapability: &VolumeCapability{}, + }, + ResponseErr: nil, + ExpectedErr: nil, + }, + { + Name: "Performs validation of the publish volume request", + Request: &NodePublishVolumeRequest{ + VolumeID: "", + }, + ResponseErr: nil, + ExpectedErr: errors.New("missing VolumeID"), + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + _, _, nc, client := newTestClient() + defer client.Close() + + nc.NextErr = c.ResponseErr + nc.NextPublishVolumeResponse = c.Response + + err := client.NodePublishVolume(context.TODO(), c.Request) + if c.ExpectedErr != nil { + require.Error(t, c.ExpectedErr, err) + } else { + require.Nil(t, err) + } + }) + } +} diff --git a/plugins/csi/fake/client.go b/plugins/csi/fake/client.go index 29b9b9081..808e7bb37 100644 --- a/plugins/csi/fake/client.go +++ b/plugins/csi/fake/client.go @@ -56,6 +56,9 @@ type Client struct { NextNodeUnstageVolumeErr error NodeUnstageVolumeCallCount int64 + + NextNodePublishVolumeErr error + NodePublishVolumeCallCount int64 } // PluginInfo describes the type and version of a plugin. @@ -178,6 +181,15 @@ func (c *Client) NodeUnstageVolume(ctx context.Context, volumeID string, staging return c.NextNodeUnstageVolumeErr } +func (c *Client) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) error { + c.Mu.Lock() + defer c.Mu.Unlock() + + c.NodePublishVolumeCallCount++ + + return c.NextNodePublishVolumeErr +} + // Shutdown the client and ensure any connections are cleaned up. func (c *Client) Close() error { return nil diff --git a/plugins/csi/plugin.go b/plugins/csi/plugin.go index 9e9b622ef..a9eb6b5ce 100644 --- a/plugins/csi/plugin.go +++ b/plugins/csi/plugin.go @@ -2,6 +2,7 @@ package csi import ( "context" + "errors" "fmt" csipbv1 "github.com/container-storage-interface/spec/lib/go/csi" @@ -54,10 +55,79 @@ type CSIPlugin interface { // If err == nil, the response should be assumed to be successful. NodeUnstageVolume(ctx context.Context, volumeID string, stagingTargetPath string) error + // NodePublishVolume is used to prepare a volume for use by an allocation. + // if err == nil the response should be assumed to be successful. + NodePublishVolume(ctx context.Context, req *NodePublishVolumeRequest) error + // Shutdown the client and ensure any connections are cleaned up. Close() error } +type NodePublishVolumeRequest struct { + // The ID of the volume to publish. + VolumeID string + + // If the volume was attached via a call to `ControllerPublishVolume` then + // we need to provide the returned PublishContext here. + PublishContext map[string]string + + // The path to which the volume was staged by `NodeStageVolume`. + // It MUST be an absolute path in the root filesystem of the process + // serving this request. + // E.g {the plugins internal mount path}/staging/volumeid/... + // + // It MUST be set if the Node Plugin implements the + // `STAGE_UNSTAGE_VOLUME` node capability. + StagingTargetPath string + + // The path to which the volume will be published. + // It MUST be an absolute path in the root filesystem of the process serving this + // request. + // E.g {the plugins internal mount path}/per-alloc/allocid/volumeid/... + // + // The CO SHALL ensure uniqueness of target_path per volume. + // The CO SHALL ensure that the parent directory of this path exists + // and that the process serving the request has `read` and `write` + // permissions to that parent directory. + TargetPath string + + // Volume capability describing how the CO intends to use this volume. + VolumeCapability *VolumeCapability + + Readonly bool + + // Reserved for future use. + Secrets map[string]string +} + +func (r *NodePublishVolumeRequest) ToCSIRepresentation() *csipbv1.NodePublishVolumeRequest { + return &csipbv1.NodePublishVolumeRequest{ + VolumeId: r.VolumeID, + PublishContext: r.PublishContext, + StagingTargetPath: r.StagingTargetPath, + TargetPath: r.TargetPath, + VolumeCapability: r.VolumeCapability.ToCSIRepresentation(), + Readonly: r.Readonly, + Secrets: r.Secrets, + } +} + +func (r *NodePublishVolumeRequest) Validate() error { + if r.VolumeID == "" { + return errors.New("missing VolumeID") + } + + if r.TargetPath == "" { + return errors.New("missing TargetPath") + } + + if r.VolumeCapability == nil { + return errors.New("missing VolumeCapabilities") + } + + return nil +} + type PluginCapabilitySet struct { hasControllerService bool hasTopologies bool diff --git a/plugins/csi/testing/client.go b/plugins/csi/testing/client.go index 77d2fd3e0..f1d22f19d 100644 --- a/plugins/csi/testing/client.go +++ b/plugins/csi/testing/client.go @@ -85,9 +85,10 @@ type NodeClient struct { NextGetInfoResponse *csipbv1.NodeGetInfoResponse NextStageVolumeResponse *csipbv1.NodeStageVolumeResponse NextUnstageVolumeResponse *csipbv1.NodeUnstageVolumeResponse + NextPublishVolumeResponse *csipbv1.NodePublishVolumeResponse } -// NewNodeClient returns a new ControllerClient +// NewNodeClient returns a new stub NodeClient func NewNodeClient() *NodeClient { return &NodeClient{} } @@ -98,6 +99,7 @@ func (f *NodeClient) Reset() { f.NextGetInfoResponse = nil f.NextStageVolumeResponse = nil f.NextUnstageVolumeResponse = nil + f.NextPublishVolumeResponse = nil } func (c *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipbv1.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipbv1.NodeGetCapabilitiesResponse, error) { @@ -115,3 +117,7 @@ func (c *NodeClient) NodeStageVolume(ctx context.Context, in *csipbv1.NodeStageV func (c *NodeClient) NodeUnstageVolume(ctx context.Context, in *csipbv1.NodeUnstageVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnstageVolumeResponse, error) { return c.NextUnstageVolumeResponse, c.NextErr } + +func (c *NodeClient) NodePublishVolume(ctx context.Context, in *csipbv1.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodePublishVolumeResponse, error) { + return c.NextPublishVolumeResponse, c.NextErr +}