From 27e5cea0c5e1187926f5866366ad217fbdd2be28 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 10 Feb 2020 11:45:06 -0500 Subject: [PATCH] csi: implement CSI controller detach request/response (#7107) This changeset implements the minimal structs on the client-side we need to compile the work-in-progress implementation of the server-to-controller RPCs. It doesn't include implementing the `ClientCSI.DettachVolume` RPC on the client. --- client/structs/csi.go | 25 +++++++++++++++ plugins/csi/client.go | 30 +++++++++++++++--- plugins/csi/client_test.go | 62 ++++++++++++++++++++++++++++++++++++-- plugins/csi/fake/client.go | 14 +++++++++ plugins/csi/plugin.go | 57 ++++++++++++++++++++++++++++++++++- 5 files changed, 180 insertions(+), 8 deletions(-) diff --git a/client/structs/csi.go b/client/structs/csi.go index d08fecfe0..9616d1b78 100644 --- a/client/structs/csi.go +++ b/client/structs/csi.go @@ -75,3 +75,28 @@ type ClientCSIControllerAttachVolumeResponse struct { // subsequent `NodeStageVolume` or `NodePublishVolume` calls PublishContext map[string]string } + +type ClientCSIControllerDetachVolumeRequest struct { + PluginName string + + // The ID of the volume to be unpublished for the node + // This field is REQUIRED. + VolumeID string + + // The ID of the node. This field is REQUIRED. This must match the NodeID that + // is fingerprinted by the target node for this plugin name. + NodeID string +} + +func (c *ClientCSIControllerDetachVolumeRequest) ToCSIRequest() *csi.ControllerUnpublishVolumeRequest { + if c == nil { + return &csi.ControllerUnpublishVolumeRequest{} + } + + return &csi.ControllerUnpublishVolumeRequest{ + VolumeID: c.VolumeID, + NodeID: c.NodeID, + } +} + +type ClientCSIControllerDetachVolumeResponse struct{} diff --git a/plugins/csi/client.go b/plugins/csi/client.go index 16e9f485e..fe982d3f2 100644 --- a/plugins/csi/client.go +++ b/plugins/csi/client.go @@ -237,13 +237,12 @@ func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPub return nil, fmt.Errorf("controllerClient not initialized") } - pbrequest := &csipbv1.ControllerPublishVolumeRequest{ - VolumeId: req.VolumeID, - NodeId: req.NodeID, - Readonly: req.ReadOnly, - //TODO: add capabilities + err := req.Validate() + if err != nil { + return nil, err } + pbrequest := req.ToCSIRepresentation() resp, err := c.controllerClient.ControllerPublishVolume(ctx, pbrequest) if err != nil { return nil, err @@ -254,6 +253,27 @@ func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPub }, nil } +func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error) { + if c == nil { + return nil, fmt.Errorf("Client not initialized") + } + if c.controllerClient == nil { + return nil, fmt.Errorf("controllerClient not initialized") + } + err := req.Validate() + if err != nil { + return nil, err + } + + upbrequest := req.ToCSIRepresentation() + _, err = c.controllerClient.ControllerUnpublishVolume(ctx, upbrequest) + if err != nil { + return nil, err + } + + return &ControllerUnpublishVolumeResponse{}, nil +} + // // Node Endpoints // diff --git a/plugins/csi/client_test.go b/plugins/csi/client_test.go index d458c58c5..5d997722e 100644 --- a/plugins/csi/client_test.go +++ b/plugins/csi/client_test.go @@ -360,6 +360,7 @@ func TestClient_RPC_NodeGetCapabilities(t *testing.T) { func TestClient_RPC_ControllerPublishVolume(t *testing.T) { cases := []struct { Name string + Request *ControllerPublishVolumeRequest ResponseErr error Response *csipbv1.ControllerPublishVolumeResponse ExpectedResponse *ControllerPublishVolumeResponse @@ -367,16 +368,26 @@ func TestClient_RPC_ControllerPublishVolume(t *testing.T) { }{ { Name: "handles underlying grpc errors", + Request: &ControllerPublishVolumeRequest{}, ResponseErr: fmt.Errorf("some grpc error"), ExpectedErr: fmt.Errorf("some grpc error"), }, + { + Name: "Handles missing NodeID", + Request: &ControllerPublishVolumeRequest{}, + Response: &csipbv1.ControllerPublishVolumeResponse{}, + ExpectedErr: fmt.Errorf("missing NodeID"), + }, + { Name: "Handles PublishContext == nil", + Request: &ControllerPublishVolumeRequest{VolumeID: "vol", NodeID: "node"}, Response: &csipbv1.ControllerPublishVolumeResponse{}, ExpectedResponse: &ControllerPublishVolumeResponse{}, }, { - Name: "Handles PublishContext != nil", + Name: "Handles PublishContext != nil", + Request: &ControllerPublishVolumeRequest{VolumeID: "vol", NodeID: "node"}, Response: &csipbv1.ControllerPublishVolumeResponse{ PublishContext: map[string]string{ "com.hashicorp/nomad-node-id": "foobar", @@ -400,7 +411,54 @@ func TestClient_RPC_ControllerPublishVolume(t *testing.T) { cc.NextErr = c.ResponseErr cc.NextPublishVolumeResponse = c.Response - resp, err := client.ControllerPublishVolume(context.TODO(), &ControllerPublishVolumeRequest{}) + resp, err := client.ControllerPublishVolume(context.TODO(), c.Request) + if c.ExpectedErr != nil { + require.Error(t, c.ExpectedErr, err) + } + + require.Equal(t, c.ExpectedResponse, resp) + }) + } +} + +func TestClient_RPC_ControllerUnpublishVolume(t *testing.T) { + cases := []struct { + Name string + Request *ControllerUnpublishVolumeRequest + ResponseErr error + Response *csipbv1.ControllerUnpublishVolumeResponse + ExpectedResponse *ControllerUnpublishVolumeResponse + ExpectedErr error + }{ + { + Name: "Handles underlying grpc errors", + Request: &ControllerUnpublishVolumeRequest{}, + ResponseErr: fmt.Errorf("some grpc error"), + ExpectedErr: fmt.Errorf("some grpc error"), + }, + { + Name: "Handles missing NodeID", + Request: &ControllerUnpublishVolumeRequest{}, + ExpectedErr: fmt.Errorf("missing NodeID"), + ExpectedResponse: nil, + }, + { + Name: "Handles successful response", + Request: &ControllerUnpublishVolumeRequest{VolumeID: "vol", NodeID: "node"}, + ExpectedErr: fmt.Errorf("missing NodeID"), + ExpectedResponse: &ControllerUnpublishVolumeResponse{}, + }, + } + + for _, c := range cases { + t.Run(c.Name, func(t *testing.T) { + _, cc, _, client := newTestClient() + defer client.Close() + + cc.NextErr = c.ResponseErr + cc.NextUnpublishVolumeResponse = c.Response + + resp, err := client.ControllerUnpublishVolume(context.TODO(), c.Request) if c.ExpectedErr != nil { require.Error(t, c.ExpectedErr, err) } diff --git a/plugins/csi/fake/client.go b/plugins/csi/fake/client.go index 22eecc55e..ea7513b8f 100644 --- a/plugins/csi/fake/client.go +++ b/plugins/csi/fake/client.go @@ -44,6 +44,10 @@ type Client struct { NextControllerPublishVolumeErr error ControllerPublishVolumeCallCount int64 + NextControllerUnpublishVolumeResponse *csi.ControllerUnpublishVolumeResponse + NextControllerUnpublishVolumeErr error + ControllerUnpublishVolumeCallCount int64 + NextNodeGetCapabilitiesResponse *csi.NodeCapabilitySet NextNodeGetCapabilitiesErr error NodeGetCapabilitiesCallCount int64 @@ -139,6 +143,16 @@ func (c *Client) ControllerPublishVolume(ctx context.Context, req *csi.Controlle return c.NextControllerPublishVolumeResponse, c.NextControllerPublishVolumeErr } +// ControllerUnpublishVolume is used to attach a remote volume to a node +func (c *Client) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { + c.Mu.Lock() + defer c.Mu.Unlock() + + c.ControllerUnpublishVolumeCallCount++ + + return c.NextControllerUnpublishVolumeResponse, c.NextControllerUnpublishVolumeErr +} + func (c *Client) NodeGetCapabilities(ctx context.Context) (*csi.NodeCapabilitySet, error) { c.Mu.Lock() defer c.Mu.Unlock() diff --git a/plugins/csi/plugin.go b/plugins/csi/plugin.go index c39990c05..56dc90b63 100644 --- a/plugins/csi/plugin.go +++ b/plugins/csi/plugin.go @@ -36,6 +36,9 @@ type CSIPlugin interface { // ControllerPublishVolume is used to attach a remote volume to a cluster node. ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest) (*ControllerPublishVolumeResponse, error) + // ControllerUnpublishVolume is used to deattach a remote volume from a cluster node. + ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error) + // NodeGetCapabilities is used to return the available capabilities from the // Node Service. NodeGetCapabilities(ctx context.Context) (*NodeCapabilitySet, error) @@ -223,14 +226,66 @@ type ControllerPublishVolumeRequest struct { VolumeID string NodeID string ReadOnly bool - //TODO: Add Capabilities } +func (r *ControllerPublishVolumeRequest) ToCSIRepresentation() *csipbv1.ControllerPublishVolumeRequest { + if r == nil { + return nil + } + + return &csipbv1.ControllerPublishVolumeRequest{ + VolumeId: r.VolumeID, + NodeId: r.NodeID, + Readonly: r.ReadOnly, + // TODO: add capabilities + } +} + +func (r *ControllerPublishVolumeRequest) Validate() error { + if r.VolumeID == "" { + return errors.New("missing VolumeID") + } + if r.NodeID == "" { + return errors.New("missing NodeID") + } + return nil +} + type ControllerPublishVolumeResponse struct { PublishContext map[string]string } +type ControllerUnpublishVolumeRequest struct { + VolumeID string + NodeID string +} + +func (r *ControllerUnpublishVolumeRequest) ToCSIRepresentation() *csipbv1.ControllerUnpublishVolumeRequest { + if r == nil { + return nil + } + + return &csipbv1.ControllerUnpublishVolumeRequest{ + VolumeId: r.VolumeID, + NodeId: r.NodeID, + } +} + +func (r *ControllerUnpublishVolumeRequest) Validate() error { + if r.VolumeID == "" { + return errors.New("missing VolumeID") + } + if r.NodeID == "" { + // the spec allows this but it would unpublish the + // volume from all nodes + return errors.New("missing NodeID") + } + return nil +} + +type ControllerUnpublishVolumeResponse struct{} + type NodeCapabilitySet struct { HasStageUnstageVolume bool }