csi: implement ControllerExpandVolume (#18359)

the first half of volume expansion,
this allows a user to update requested capacity
("capacity_min" and "capacity_max") in a volume
specification file, and re-issue either Register
or Create volume commands (or api calls).

the requested capacity will now be "reconciled"
with the current real capacity of the volume,
issuing a ControllerExpandVolume RPC call
to a running controller plugin, if requested
"capacity_min" is higher than the current
capacity on the volume in state.

csi spec:
https://github.com/container-storage-interface/spec/blob/c918b7f/spec.md#controllerexpandvolume

note: this does not yet cover NodeExpandVolume
This commit is contained in:
Daniel Bennett
2023-09-14 14:13:04 -05:00
committed by GitHub
parent 0329393a28
commit c6dbba7cde
17 changed files with 1055 additions and 89 deletions

View File

@@ -14,14 +14,15 @@ import (
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper/grpc-middleware/logging"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"golang.org/x/exp/maps"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/hashicorp/nomad/helper/grpc-middleware/logging"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
)
// PluginTypeCSI implements the CSI plugin interface
@@ -75,6 +76,7 @@ type CSIControllerClient interface {
CreateVolume(ctx context.Context, in *csipbv1.CreateVolumeRequest, opts ...grpc.CallOption) (*csipbv1.CreateVolumeResponse, error)
ListVolumes(ctx context.Context, in *csipbv1.ListVolumesRequest, opts ...grpc.CallOption) (*csipbv1.ListVolumesResponse, error)
DeleteVolume(ctx context.Context, in *csipbv1.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipbv1.DeleteVolumeResponse, error)
ControllerExpandVolume(ctx context.Context, in *csipbv1.ControllerExpandVolumeRequest, opts ...grpc.CallOption) (*csipbv1.ControllerExpandVolumeResponse, error)
CreateSnapshot(ctx context.Context, in *csipbv1.CreateSnapshotRequest, opts ...grpc.CallOption) (*csipbv1.CreateSnapshotResponse, error)
DeleteSnapshot(ctx context.Context, in *csipbv1.DeleteSnapshotRequest, opts ...grpc.CallOption) (*csipbv1.DeleteSnapshotResponse, error)
ListSnapshots(ctx context.Context, in *csipbv1.ListSnapshotsRequest, opts ...grpc.CallOption) (*csipbv1.ListSnapshotsResponse, error)
@@ -89,6 +91,7 @@ type CSINodeClient interface {
NodeUnstageVolume(ctx context.Context, in *csipbv1.NodeUnstageVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnstageVolumeResponse, error)
NodePublishVolume(ctx context.Context, in *csipbv1.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodePublishVolumeResponse, error)
NodeUnpublishVolume(ctx context.Context, in *csipbv1.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnpublishVolumeResponse, error)
NodeExpandVolume(ctx context.Context, in *csipbv1.NodeExpandVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeExpandVolumeResponse, error)
}
type client struct {
@@ -510,6 +513,44 @@ func (c *client) ControllerDeleteVolume(ctx context.Context, req *ControllerDele
return err
}
func (c *client) ControllerExpandVolume(ctx context.Context, req *ControllerExpandVolumeRequest, opts ...grpc.CallOption) (*ControllerExpandVolumeResponse, error) {
if err := req.Validate(); err != nil {
return nil, err
}
if err := c.ensureConnected(ctx); err != nil {
return nil, err
}
exReq := req.ToCSIRepresentation()
resp, err := c.controllerClient.ControllerExpandVolume(ctx, exReq, opts...)
if err != nil {
code := status.Code(err)
switch code {
case codes.InvalidArgument:
return nil, fmt.Errorf(
"requested capabilities not compatible with volume %q: %v",
req.ExternalVolumeID, err)
case codes.NotFound:
err = fmt.Errorf("volume %q could not be found: %v", req.ExternalVolumeID, err)
case codes.FailedPrecondition:
err = fmt.Errorf("volume %q cannot be expanded online: %v", req.ExternalVolumeID, err)
case codes.OutOfRange:
return nil, fmt.Errorf(
"unsupported capacity_range for volume %q: %v", req.ExternalVolumeID, err)
case codes.Internal:
err = fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
default:
err = fmt.Errorf("controller plugin returned an error: %v", err)
}
return nil, err
}
return &ControllerExpandVolumeResponse{
CapacityBytes: resp.GetCapacityBytes(),
NodeExpansionRequired: resp.GetNodeExpansionRequired(),
}, nil
}
// compareCapabilities returns an error if the 'got' capabilities aren't found
// within the 'expected' capability.
//
@@ -883,3 +924,7 @@ func (c *client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath s
return err
}
func (c *client) NodeExpandVolume(ctx context.Context, req *NodeExpandVolumeRequest, opts ...grpc.CallOption) (*NodeExpandVolumeResponse, error) {
return nil, nil
}

View File

@@ -13,14 +13,16 @@ import (
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
"github.com/golang/protobuf/ptypes/wrappers"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/nomad/structs"
fake "github.com/hashicorp/nomad/plugins/csi/testing"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/nomad/structs"
fake "github.com/hashicorp/nomad/plugins/csi/testing"
)
func newTestClient(t *testing.T) (*fake.IdentityClient, *fake.ControllerClient, *fake.NodeClient, CSIPlugin) {
@@ -42,6 +44,9 @@ func newTestClient(t *testing.T) (*fake.IdentityClient, *fake.ControllerClient,
controllerClient: cc,
nodeClient: nc,
}
t.Cleanup(func() {
_ = client.Close()
})
return ic, cc, nc, client
}
@@ -1170,6 +1175,155 @@ func TestClient_RPC_ControllerListSnapshots(t *testing.T) {
}
}
func TestClient_RPC_ControllerExpandVolume(t *testing.T) {
cases := []struct {
Name string
Request *ControllerExpandVolumeRequest
ExpectCall *csipbv1.ControllerExpandVolumeRequest
ResponseErr error
ExpectedErr error
}{
{
Name: "success",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1",
RequiredBytes: 1,
LimitBytes: 2,
Capability: &VolumeCapability{
AccessMode: VolumeAccessModeMultiNodeSingleWriter,
},
Secrets: map[string]string{"super": "secret"},
},
ExpectCall: &csipbv1.ControllerExpandVolumeRequest{
VolumeId: "vol-1",
CapacityRange: &csipbv1.CapacityRange{
RequiredBytes: 1,
LimitBytes: 2,
},
VolumeCapability: &csipbv1.VolumeCapability{
AccessMode: &csipbv1.VolumeCapability_AccessMode{
Mode: csipbv1.VolumeCapability_AccessMode_Mode(csipbv1.VolumeCapability_AccessMode_MULTI_NODE_SINGLE_WRITER),
},
AccessType: &csipbv1.VolumeCapability_Block{Block: &csipbv1.VolumeCapability_BlockVolume{}},
},
Secrets: map[string]string{"super": "secret"},
},
},
{
Name: "validate only min set",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1",
RequiredBytes: 4,
},
ExpectCall: &csipbv1.ControllerExpandVolumeRequest{
VolumeId: "vol-1",
CapacityRange: &csipbv1.CapacityRange{
RequiredBytes: 4,
},
},
},
{
Name: "validate missing volume ID",
Request: &ControllerExpandVolumeRequest{},
ExpectedErr: errors.New("missing ExternalVolumeID"),
},
{
Name: "validate missing max/min size",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1",
},
ExpectedErr: errors.New("one of LimitBytes or RequiredBytes must be set"),
},
{
Name: "validate min greater than max",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1",
RequiredBytes: 4,
LimitBytes: 2,
},
ExpectedErr: errors.New("LimitBytes cannot be less than RequiredBytes"),
},
{
Name: "grpc error InvalidArgument",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1", LimitBytes: 1000},
ResponseErr: status.Errorf(codes.InvalidArgument, "sad args"),
ExpectedErr: errors.New("requested capabilities not compatible with volume \"vol-1\": rpc error: code = InvalidArgument desc = sad args"),
},
{
Name: "grpc error NotFound",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1", LimitBytes: 1000},
ResponseErr: status.Errorf(codes.NotFound, "does not exist"),
ExpectedErr: errors.New("volume \"vol-1\" could not be found: rpc error: code = NotFound desc = does not exist"),
},
{
Name: "grpc error FailedPrecondition",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1", LimitBytes: 1000},
ResponseErr: status.Errorf(codes.FailedPrecondition, "unsupported"),
ExpectedErr: errors.New("volume \"vol-1\" cannot be expanded online: rpc error: code = FailedPrecondition desc = unsupported"),
},
{
Name: "grpc error OutOfRange",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1", LimitBytes: 1000},
ResponseErr: status.Errorf(codes.OutOfRange, "too small"),
ExpectedErr: errors.New("unsupported capacity_range for volume \"vol-1\": rpc error: code = OutOfRange desc = too small"),
},
{
Name: "grpc error Internal",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1", LimitBytes: 1000},
ResponseErr: status.Errorf(codes.Internal, "some grpc error"),
ExpectedErr: errors.New("controller plugin returned an internal error, check the plugin allocation logs for more information: rpc error: code = Internal desc = some grpc error"),
},
{
Name: "grpc error default case",
Request: &ControllerExpandVolumeRequest{
ExternalVolumeID: "vol-1", LimitBytes: 1000},
ResponseErr: status.Errorf(codes.DataLoss, "misc unspecified error"),
ExpectedErr: errors.New("controller plugin returned an error: rpc error: code = DataLoss desc = misc unspecified error"),
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
_, cc, _, client := newTestClient(t)
cc.NextErr = tc.ResponseErr
// the fake client should take ~no time, but set a timeout just in case
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
resp, err := client.ControllerExpandVolume(ctx, tc.Request)
if tc.ExpectedErr != nil {
must.EqError(t, err, tc.ExpectedErr.Error())
return
}
must.NoError(t, err)
must.NotNil(t, resp)
must.Eq(t, tc.ExpectCall, cc.LastExpandVolumeRequest)
})
}
t.Run("connection error", func(t *testing.T) {
c := &client{} // induce c.ensureConnected() error
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
resp, err := c.ControllerExpandVolume(ctx, &ControllerExpandVolumeRequest{
ExternalVolumeID: "valid-id",
RequiredBytes: 1,
})
must.Nil(t, resp)
must.EqError(t, err, "address is empty")
})
}
func TestClient_RPC_NodeStageVolume(t *testing.T) {
ci.Parallel(t)

View File

@@ -11,10 +11,11 @@ import (
"fmt"
"sync"
"google.golang.org/grpc"
"github.com/hashicorp/nomad/plugins/base"
"github.com/hashicorp/nomad/plugins/csi"
"github.com/hashicorp/nomad/plugins/shared/hclspec"
"google.golang.org/grpc"
)
var _ csi.CSIPlugin = &Client{}
@@ -78,6 +79,10 @@ type Client struct {
NextControllerListSnapshotsErr error
ControllerListSnapshotsCallCount int64
NextControllerExpandVolumeResponse *csi.ControllerExpandVolumeResponse
NextControllerExpandVolumeErr error
ControllerExpandVolumeCallCount int64
NextNodeGetCapabilitiesResponse *csi.NodeCapabilitySet
NextNodeGetCapabilitiesErr error
NodeGetCapabilitiesCallCount int64
@@ -98,6 +103,10 @@ type Client struct {
NextNodeUnpublishVolumeErr error
NodeUnpublishVolumeCallCount int64
NextNodeExpandVolumeResponse *csi.NodeExpandVolumeResponse
NextNodeExpandVolumeErr error
NodeExpandVolumeCallCount int64
}
// PluginInfo describes the type and version of a plugin.
@@ -235,6 +244,13 @@ func (c *Client) ControllerListSnapshots(ctx context.Context, req *csi.Controlle
return c.NextControllerListSnapshotsResponse, c.NextControllerListSnapshotsErr
}
func (c *Client) ControllerExpandVolume(ctx context.Context, in *csi.ControllerExpandVolumeRequest, opts ...grpc.CallOption) (*csi.ControllerExpandVolumeResponse, error) {
c.Mu.Lock()
defer c.Mu.Unlock()
c.ControllerExpandVolumeCallCount++
return c.NextControllerExpandVolumeResponse, c.NextControllerExpandVolumeErr
}
func (c *Client) NodeGetCapabilities(ctx context.Context) (*csi.NodeCapabilitySet, error) {
c.Mu.Lock()
defer c.Mu.Unlock()
@@ -300,6 +316,14 @@ func (c *Client) NodeUnpublishVolume(ctx context.Context, volumeID, targetPath s
return c.NextNodeUnpublishVolumeErr
}
func (c *Client) NodeExpandVolume(ctx context.Context, req *csi.NodeExpandVolumeRequest, opts ...grpc.CallOption) (*csi.NodeExpandVolumeResponse, error) {
c.Mu.Lock()
defer c.Mu.Unlock()
c.NodeExpandVolumeCallCount++
return c.NextNodeExpandVolumeResponse, c.NextNodeExpandVolumeErr
}
// Close the client and ensure any connections are cleaned up.
func (c *Client) Close() error {
@@ -325,6 +349,9 @@ func (c *Client) Close() error {
c.NextControllerUnpublishVolumeResponse = nil
c.NextControllerUnpublishVolumeErr = fmt.Errorf("closed client")
c.NextControllerExpandVolumeResponse = nil
c.NextControllerExpandVolumeErr = fmt.Errorf("closed client")
c.NextControllerValidateVolumeErr = fmt.Errorf("closed client")
c.NextNodeGetCapabilitiesResponse = nil
@@ -341,5 +368,8 @@ func (c *Client) Close() error {
c.NextNodeUnpublishVolumeErr = fmt.Errorf("closed client")
c.NextNodeExpandVolumeResponse = nil
c.NextNodeExpandVolumeErr = fmt.Errorf("closed client")
return nil
}

View File

@@ -9,9 +9,10 @@ import (
"fmt"
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
"google.golang.org/grpc"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/base"
"google.golang.org/grpc"
)
// CSIPlugin implements a lightweight abstraction layer around a CSI Plugin.
@@ -60,6 +61,9 @@ type CSIPlugin interface {
// external storage provider
ControllerListVolumes(ctx context.Context, req *ControllerListVolumesRequest, opts ...grpc.CallOption) (*ControllerListVolumesResponse, error)
// ControllerExpandVolume is used to expand a volume's size
ControllerExpandVolume(ctx context.Context, req *ControllerExpandVolumeRequest, opts ...grpc.CallOption) (*ControllerExpandVolumeResponse, error)
// ControllerCreateSnapshot is used to create a volume snapshot in the
// external storage provider
ControllerCreateSnapshot(ctx context.Context, req *ControllerCreateSnapshotRequest, opts ...grpc.CallOption) (*ControllerCreateSnapshotResponse, error)
@@ -101,6 +105,11 @@ type CSIPlugin interface {
// for the given volume.
NodeUnpublishVolume(ctx context.Context, volumeID, targetPath string, opts ...grpc.CallOption) error
// NodeExpandVolume is used to expand a volume. This MUST be called after
// any ControllerExpandVolume is called, but only if that RPC indicates
// that node expansion is required
NodeExpandVolume(ctx context.Context, req *NodeExpandVolumeRequest, opts ...grpc.CallOption) (*NodeExpandVolumeResponse, error)
// Shutdown the client and ensure any connections are cleaned up.
Close() error
}
@@ -492,7 +501,8 @@ func (r *ControllerCreateVolumeRequest) Validate() error {
return errors.New(
"one of LimitBytes or RequiredBytes must be set if CapacityRange is set")
}
if r.CapacityRange.LimitBytes < r.CapacityRange.RequiredBytes {
if r.CapacityRange.LimitBytes > 0 &&
r.CapacityRange.LimitBytes < r.CapacityRange.RequiredBytes {
return errors.New("LimitBytes cannot be less than RequiredBytes")
}
}
@@ -625,6 +635,49 @@ func (r *ControllerDeleteVolumeRequest) Validate() error {
return nil
}
type ControllerExpandVolumeRequest struct {
ExternalVolumeID string
RequiredBytes int64
LimitBytes int64
Capability *VolumeCapability
Secrets structs.CSISecrets
}
func (r *ControllerExpandVolumeRequest) Validate() error {
if r.ExternalVolumeID == "" {
return errors.New("missing ExternalVolumeID")
}
if r.LimitBytes == 0 && r.RequiredBytes == 0 {
return errors.New("one of LimitBytes or RequiredBytes must be set")
}
// per the spec: "A value of 0 is equal to an unspecified field value."
// so in this case, only error if both are set.
if r.LimitBytes > 0 && (r.LimitBytes < r.RequiredBytes) {
return errors.New("LimitBytes cannot be less than RequiredBytes")
}
return nil
}
func (r *ControllerExpandVolumeRequest) ToCSIRepresentation() *csipbv1.ControllerExpandVolumeRequest {
if r == nil {
return nil
}
return &csipbv1.ControllerExpandVolumeRequest{
VolumeId: r.ExternalVolumeID,
CapacityRange: &csipbv1.CapacityRange{
RequiredBytes: r.RequiredBytes,
LimitBytes: r.LimitBytes,
},
Secrets: r.Secrets,
VolumeCapability: r.Capability.ToCSIRepresentation(),
}
}
type ControllerExpandVolumeResponse struct {
CapacityBytes int64
NodeExpansionRequired bool
}
type ControllerListVolumesRequest struct {
MaxEntries int32
StartingToken string
@@ -976,3 +1029,32 @@ func (c *CapacityRange) ToCSIRepresentation() *csipbv1.CapacityRange {
LimitBytes: c.LimitBytes,
}
}
type NodeExpandVolumeRequest struct {
ExternalVolumeID string
RequiredBytes int64
LimitBytes int64
TargetPath string
StagingPath string
Capability *VolumeCapability
}
func (r *NodeExpandVolumeRequest) ToCSIRepresentation() *csipbv1.NodeExpandVolumeRequest {
if r == nil {
return nil
}
return &csipbv1.NodeExpandVolumeRequest{
VolumeId: r.ExternalVolumeID,
VolumePath: r.TargetPath,
CapacityRange: &csipbv1.CapacityRange{
RequiredBytes: r.RequiredBytes,
LimitBytes: r.LimitBytes,
},
StagingTargetPath: r.StagingPath,
VolumeCapability: r.Capability.ToCSIRepresentation(),
}
}
type NodeExpandVolumeResponse struct {
CapacityBytes int64
}

View File

@@ -54,6 +54,8 @@ type ControllerClient struct {
NextUnpublishVolumeResponse *csipbv1.ControllerUnpublishVolumeResponse
NextValidateVolumeCapabilitiesResponse *csipbv1.ValidateVolumeCapabilitiesResponse
NextCreateVolumeResponse *csipbv1.CreateVolumeResponse
NextExpandVolumeResponse *csipbv1.ControllerExpandVolumeResponse
LastExpandVolumeRequest *csipbv1.ControllerExpandVolumeRequest
NextDeleteVolumeResponse *csipbv1.DeleteVolumeResponse
NextListVolumesResponse *csipbv1.ListVolumesResponse
NextCreateSnapshotResponse *csipbv1.CreateSnapshotResponse
@@ -73,6 +75,8 @@ func (c *ControllerClient) Reset() {
c.NextUnpublishVolumeResponse = nil
c.NextValidateVolumeCapabilitiesResponse = nil
c.NextCreateVolumeResponse = nil
c.NextExpandVolumeResponse = nil
c.LastExpandVolumeRequest = nil
c.NextDeleteVolumeResponse = nil
c.NextListVolumesResponse = nil
c.NextCreateSnapshotResponse = nil
@@ -111,6 +115,11 @@ func (c *ControllerClient) CreateVolume(ctx context.Context, in *csipbv1.CreateV
return c.NextCreateVolumeResponse, c.NextErr
}
func (c *ControllerClient) ControllerExpandVolume(ctx context.Context, in *csipbv1.ControllerExpandVolumeRequest, opts ...grpc.CallOption) (*csipbv1.ControllerExpandVolumeResponse, error) {
c.LastExpandVolumeRequest = in
return c.NextExpandVolumeResponse, c.NextErr
}
func (c *ControllerClient) DeleteVolume(ctx context.Context, in *csipbv1.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipbv1.DeleteVolumeResponse, error) {
return c.NextDeleteVolumeResponse, c.NextErr
}
@@ -140,6 +149,7 @@ type NodeClient struct {
NextUnstageVolumeResponse *csipbv1.NodeUnstageVolumeResponse
NextPublishVolumeResponse *csipbv1.NodePublishVolumeResponse
NextUnpublishVolumeResponse *csipbv1.NodeUnpublishVolumeResponse
NextExpandVolumeResponse *csipbv1.NodeExpandVolumeResponse
}
// NewNodeClient returns a new stub NodeClient
@@ -155,6 +165,7 @@ func (c *NodeClient) Reset() {
c.NextUnstageVolumeResponse = nil
c.NextPublishVolumeResponse = nil
c.NextUnpublishVolumeResponse = nil
c.NextExpandVolumeResponse = nil
}
func (c *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipbv1.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipbv1.NodeGetCapabilitiesResponse, error) {
@@ -180,3 +191,7 @@ func (c *NodeClient) NodePublishVolume(ctx context.Context, in *csipbv1.NodePubl
func (c *NodeClient) NodeUnpublishVolume(ctx context.Context, in *csipbv1.NodeUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnpublishVolumeResponse, error) {
return c.NextUnpublishVolumeResponse, c.NextErr
}
func (c *NodeClient) NodeExpandVolume(ctx context.Context, in *csipbv1.NodeExpandVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeExpandVolumeResponse, error) {
return c.NextExpandVolumeResponse, c.NextErr
}