mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
CSI: protobuffer mappings for Create/Delete/List volume RPCs
Note that unset proto fields for volume create should be nil. The CSI spec handles empty fields and nil fields in the protobuf differently, which may result in validation failures for creating volumes with no prior source (and does in testing with the AWS EBS plugin). Refactor the `CreateVolumeRequest` mapping to the protobuf in the plugin client to avoid this bug.
This commit is contained in:
@@ -68,6 +68,9 @@ type CSIControllerClient interface {
|
||||
ControllerPublishVolume(ctx context.Context, in *csipbv1.ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.ControllerPublishVolumeResponse, error)
|
||||
ControllerUnpublishVolume(ctx context.Context, in *csipbv1.ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.ControllerUnpublishVolumeResponse, error)
|
||||
ValidateVolumeCapabilities(ctx context.Context, in *csipbv1.ValidateVolumeCapabilitiesRequest, opts ...grpc.CallOption) (*csipbv1.ValidateVolumeCapabilitiesResponse, error)
|
||||
CreateVolume(ctx context.Context, in *csipbv1.CreateVolumeRequest, opts ...grpc.CallOption) (*csipbv1.CreateVolumeResponse, error)
|
||||
ListVolumes(ctx context.Context, in *csipbv1.ListVolumesRequest, opts ...grpc.CallOption) (*csipbv1.ListVolumesResponse, error)
|
||||
DeleteVolume(ctx context.Context, in *csipbv1.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipbv1.DeleteVolumeResponse, error)
|
||||
}
|
||||
|
||||
// CSINodeClient defines the minimal CSI Node Plugin interface used
|
||||
@@ -383,6 +386,91 @@ func (c *client) ControllerValidateCapabilities(ctx context.Context, req *Contro
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *client) ControllerCreateVolume(ctx context.Context, req *ControllerCreateVolumeRequest, opts ...grpc.CallOption) (*ControllerCreateVolumeResponse, error) {
|
||||
err := req.Validate()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
creq := req.ToCSIRepresentation()
|
||||
resp, err := c.controllerClient.CreateVolume(ctx, creq, opts...)
|
||||
|
||||
// these standard gRPC error codes are overloaded with CSI-specific
|
||||
// meanings, so translate them into user-understandable terms
|
||||
// https://github.com/container-storage-interface/spec/blob/master/spec.md#createvolume-errors
|
||||
if err != nil {
|
||||
code := status.Code(err)
|
||||
switch code {
|
||||
case codes.InvalidArgument:
|
||||
return nil, fmt.Errorf(
|
||||
"volume %q snapshot source %q is not compatible with these parameters: %v",
|
||||
req.Name, req.ContentSource, err)
|
||||
case codes.NotFound:
|
||||
return nil, fmt.Errorf(
|
||||
"volume %q content source %q does not exist: %v",
|
||||
req.Name, req.ContentSource, err)
|
||||
case codes.AlreadyExists:
|
||||
return nil, fmt.Errorf(
|
||||
"volume %q already exists but is incompatible with these parameters: %v",
|
||||
req.Name, err)
|
||||
case codes.ResourceExhausted:
|
||||
return nil, fmt.Errorf(
|
||||
"unable to provision %q in accessible_topology: %v",
|
||||
req.Name, err)
|
||||
case codes.OutOfRange:
|
||||
return nil, fmt.Errorf(
|
||||
"unsupported capacity_range for volume %q: %v", req.Name, err)
|
||||
case codes.Internal:
|
||||
return nil, fmt.Errorf(
|
||||
"controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return NewCreateVolumeResponse(resp), nil
|
||||
}
|
||||
|
||||
func (c *client) ControllerListVolumes(ctx context.Context, req *ControllerListVolumesRequest, opts ...grpc.CallOption) (*ControllerListVolumesResponse, error) {
|
||||
err := req.Validate()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
creq := req.ToCSIRepresentation()
|
||||
resp, err := c.controllerClient.ListVolumes(ctx, creq, opts...)
|
||||
if err != nil {
|
||||
code := status.Code(err)
|
||||
switch code {
|
||||
case codes.Aborted:
|
||||
return nil, fmt.Errorf(
|
||||
"invalid starting token %q: %v", req.StartingToken, err)
|
||||
case codes.Internal:
|
||||
return nil, fmt.Errorf(
|
||||
"controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
|
||||
}
|
||||
return nil, err
|
||||
}
|
||||
return NewListVolumesResponse(resp), nil
|
||||
}
|
||||
|
||||
func (c *client) ControllerDeleteVolume(ctx context.Context, req *ControllerDeleteVolumeRequest, opts ...grpc.CallOption) error {
|
||||
err := req.Validate()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
creq := req.ToCSIRepresentation()
|
||||
_, err = c.controllerClient.DeleteVolume(ctx, creq, opts...)
|
||||
if err != nil {
|
||||
code := status.Code(err)
|
||||
switch code {
|
||||
case codes.FailedPrecondition:
|
||||
return fmt.Errorf("volume %q is in use: %v", req.ExternalVolumeID, err)
|
||||
case codes.Internal:
|
||||
return fmt.Errorf(
|
||||
"controller plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
|
||||
}
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// compareCapabilities returns an error if the 'got' capabilities aren't found
|
||||
// within the 'expected' capability.
|
||||
//
|
||||
|
||||
@@ -698,7 +698,249 @@ func TestClient_RPC_ControllerValidateVolume(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestClient_RPC_ControllerCreateVolume(t *testing.T) {
|
||||
|
||||
cases := []struct {
|
||||
Name string
|
||||
CapacityRange *CapacityRange
|
||||
ContentSource *VolumeContentSource
|
||||
ResponseErr error
|
||||
Response *csipbv1.CreateVolumeResponse
|
||||
ExpectedErr error
|
||||
}{
|
||||
{
|
||||
Name: "handles underlying grpc errors",
|
||||
ResponseErr: status.Errorf(codes.Internal, "some grpc error"),
|
||||
ExpectedErr: fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: rpc error: code = Internal desc = some grpc error"),
|
||||
},
|
||||
|
||||
{
|
||||
Name: "handles error invalid capacity range",
|
||||
CapacityRange: &CapacityRange{
|
||||
RequiredBytes: 1000,
|
||||
LimitBytes: 500,
|
||||
},
|
||||
ExpectedErr: errors.New("LimitBytes cannot be less than RequiredBytes"),
|
||||
},
|
||||
|
||||
{
|
||||
Name: "handles error invalid content source",
|
||||
ContentSource: &VolumeContentSource{
|
||||
SnapshotID: "snap-12345",
|
||||
CloneID: "vol-12345",
|
||||
},
|
||||
ExpectedErr: errors.New(
|
||||
"one of SnapshotID or CloneID must be set if ContentSource is set"),
|
||||
},
|
||||
|
||||
{
|
||||
Name: "handles success missing source and range",
|
||||
Response: &csipbv1.CreateVolumeResponse{},
|
||||
},
|
||||
|
||||
{
|
||||
Name: "handles success with capacity range and source",
|
||||
CapacityRange: &CapacityRange{
|
||||
RequiredBytes: 500,
|
||||
LimitBytes: 1000,
|
||||
},
|
||||
ContentSource: &VolumeContentSource{
|
||||
SnapshotID: "snap-12345",
|
||||
},
|
||||
Response: &csipbv1.CreateVolumeResponse{
|
||||
Volume: &csipbv1.Volume{
|
||||
CapacityBytes: 1000,
|
||||
ContentSource: &csipbv1.VolumeContentSource{
|
||||
Type: &csipbv1.VolumeContentSource_Snapshot{
|
||||
Snapshot: &csipbv1.VolumeContentSource_SnapshotSource{
|
||||
SnapshotId: "snap-12345",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, cc, _, client := newTestClient()
|
||||
defer client.Close()
|
||||
|
||||
req := &ControllerCreateVolumeRequest{
|
||||
Name: "vol-123456",
|
||||
CapacityRange: tc.CapacityRange,
|
||||
VolumeCapabilities: []*VolumeCapability{
|
||||
{
|
||||
AccessType: VolumeAccessTypeMount,
|
||||
AccessMode: VolumeAccessModeMultiNodeMultiWriter,
|
||||
},
|
||||
},
|
||||
Parameters: map[string]string{},
|
||||
Secrets: structs.CSISecrets{},
|
||||
ContentSource: tc.ContentSource,
|
||||
AccessibilityRequirements: &TopologyRequirement{},
|
||||
}
|
||||
|
||||
cc.NextCreateVolumeResponse = tc.Response
|
||||
cc.NextErr = tc.ResponseErr
|
||||
|
||||
resp, err := client.ControllerCreateVolume(context.TODO(), req)
|
||||
if tc.ExpectedErr != nil {
|
||||
require.EqualError(t, err, tc.ExpectedErr.Error())
|
||||
return
|
||||
}
|
||||
require.NoError(t, err, tc.Name)
|
||||
if tc.Response == nil {
|
||||
require.Nil(t, resp)
|
||||
return
|
||||
}
|
||||
if tc.CapacityRange != nil {
|
||||
require.Greater(t, resp.Volume.CapacityBytes, int64(0))
|
||||
}
|
||||
if tc.ContentSource != nil {
|
||||
require.Equal(t, tc.ContentSource.CloneID, resp.Volume.ContentSource.CloneID)
|
||||
require.Equal(t, tc.ContentSource.SnapshotID, resp.Volume.ContentSource.SnapshotID)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestClient_RPC_ControllerDeleteVolume(t *testing.T) {
|
||||
|
||||
cases := []struct {
|
||||
Name string
|
||||
Request *ControllerDeleteVolumeRequest
|
||||
ResponseErr error
|
||||
ExpectedErr error
|
||||
}{
|
||||
{
|
||||
Name: "handles underlying grpc errors",
|
||||
Request: &ControllerDeleteVolumeRequest{ExternalVolumeID: "vol-12345"},
|
||||
ResponseErr: status.Errorf(codes.Internal, "some grpc error"),
|
||||
ExpectedErr: fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: rpc error: code = Internal desc = some grpc error"),
|
||||
},
|
||||
|
||||
{
|
||||
Name: "handles error missing volume ID",
|
||||
Request: &ControllerDeleteVolumeRequest{},
|
||||
ExpectedErr: errors.New("missing ExternalVolumeID"),
|
||||
},
|
||||
|
||||
{
|
||||
Name: "handles success",
|
||||
Request: &ControllerDeleteVolumeRequest{ExternalVolumeID: "vol-12345"},
|
||||
},
|
||||
}
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, cc, _, client := newTestClient()
|
||||
defer client.Close()
|
||||
|
||||
cc.NextErr = tc.ResponseErr
|
||||
err := client.ControllerDeleteVolume(context.TODO(), tc.Request)
|
||||
if tc.ExpectedErr != nil {
|
||||
require.EqualError(t, err, tc.ExpectedErr.Error())
|
||||
return
|
||||
}
|
||||
require.NoError(t, err, tc.Name)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestClient_RPC_ControllerListVolume(t *testing.T) {
|
||||
|
||||
cases := []struct {
|
||||
Name string
|
||||
Request *ControllerListVolumesRequest
|
||||
ResponseErr error
|
||||
ExpectedErr error
|
||||
}{
|
||||
{
|
||||
Name: "handles underlying grpc errors",
|
||||
Request: &ControllerListVolumesRequest{},
|
||||
ResponseErr: status.Errorf(codes.Internal, "some grpc error"),
|
||||
ExpectedErr: fmt.Errorf("controller plugin returned an internal error, check the plugin allocation logs for more information: rpc error: code = Internal desc = some grpc error"),
|
||||
},
|
||||
|
||||
{
|
||||
Name: "handles error invalid max entries",
|
||||
Request: &ControllerListVolumesRequest{MaxEntries: -1},
|
||||
ExpectedErr: errors.New("MaxEntries cannot be negative"),
|
||||
},
|
||||
|
||||
{
|
||||
Name: "handles success",
|
||||
Request: &ControllerListVolumesRequest{},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.Name, func(t *testing.T) {
|
||||
_, cc, _, client := newTestClient()
|
||||
defer client.Close()
|
||||
|
||||
cc.NextErr = tc.ResponseErr
|
||||
if tc.ResponseErr != nil {
|
||||
// note: there's nothing interesting to assert here other than
|
||||
// that we don't throw a NPE during transformation from
|
||||
// protobuf to our struct
|
||||
cc.NextListVolumesResponse = &csipbv1.ListVolumesResponse{
|
||||
Entries: []*csipbv1.ListVolumesResponse_Entry{
|
||||
{
|
||||
Volume: &csipbv1.Volume{
|
||||
CapacityBytes: 1000000,
|
||||
VolumeId: "vol-0",
|
||||
VolumeContext: map[string]string{"foo": "bar"},
|
||||
|
||||
ContentSource: &csipbv1.VolumeContentSource{},
|
||||
AccessibleTopology: []*csipbv1.Topology{
|
||||
{
|
||||
Segments: map[string]string{"rack": "A"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
Volume: &csipbv1.Volume{
|
||||
VolumeId: "vol-1",
|
||||
AccessibleTopology: []*csipbv1.Topology{
|
||||
{
|
||||
Segments: map[string]string{"rack": "A"},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
{
|
||||
Volume: &csipbv1.Volume{
|
||||
VolumeId: "vol-3",
|
||||
ContentSource: &csipbv1.VolumeContentSource{
|
||||
Type: &csipbv1.VolumeContentSource_Snapshot{
|
||||
Snapshot: &csipbv1.VolumeContentSource_SnapshotSource{
|
||||
SnapshotId: "snap-12345",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
NextToken: "abcdef",
|
||||
}
|
||||
}
|
||||
|
||||
resp, err := client.ControllerListVolumes(context.TODO(), tc.Request)
|
||||
if tc.ExpectedErr != nil {
|
||||
require.EqualError(t, err, tc.ExpectedErr.Error())
|
||||
return
|
||||
}
|
||||
require.NoError(t, err, tc.Name)
|
||||
require.NotNil(t, resp)
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestClient_RPC_NodeStageVolume(t *testing.T) {
|
||||
|
||||
@@ -50,6 +50,17 @@ type Client struct {
|
||||
NextControllerUnpublishVolumeErr error
|
||||
ControllerUnpublishVolumeCallCount int64
|
||||
|
||||
NextControllerCreateVolumeResponse *csi.ControllerCreateVolumeResponse
|
||||
NextControllerCreateVolumeErr error
|
||||
ControllerCreateVolumeCallCount int64
|
||||
|
||||
NextControllerDeleteVolumeErr error
|
||||
ControllerDeleteVolumeCallCount int64
|
||||
|
||||
NextControllerListVolumesResponse *csi.ControllerListVolumesResponse
|
||||
NextControllerListVolumesErr error
|
||||
ControllerListVolumesCallCount int64
|
||||
|
||||
NextControllerValidateVolumeErr error
|
||||
ControllerValidateVolumeCallCount int64
|
||||
|
||||
@@ -168,6 +179,27 @@ func (c *Client) ControllerValidateCapabilities(ctx context.Context, req *csi.Co
|
||||
return c.NextControllerValidateVolumeErr
|
||||
}
|
||||
|
||||
func (c *Client) ControllerCreateVolume(ctx context.Context, in *csi.ControllerCreateVolumeRequest, opts ...grpc.CallOption) (*csi.ControllerCreateVolumeResponse, error) {
|
||||
c.Mu.Lock()
|
||||
defer c.Mu.Unlock()
|
||||
c.ControllerCreateVolumeCallCount++
|
||||
return c.NextControllerCreateVolumeResponse, c.NextControllerCreateVolumeErr
|
||||
}
|
||||
|
||||
func (c *Client) ControllerDeleteVolume(ctx context.Context, req *csi.ControllerDeleteVolumeRequest, opts ...grpc.CallOption) error {
|
||||
c.Mu.Lock()
|
||||
defer c.Mu.Unlock()
|
||||
c.ControllerDeleteVolumeCallCount++
|
||||
return c.NextControllerDeleteVolumeErr
|
||||
}
|
||||
|
||||
func (c *Client) ControllerListVolumes(ctx context.Context, req *csi.ControllerListVolumesRequest, opts ...grpc.CallOption) (*csi.ControllerListVolumesResponse, error) {
|
||||
c.Mu.Lock()
|
||||
defer c.Mu.Unlock()
|
||||
c.ControllerListVolumesCallCount++
|
||||
return c.NextControllerListVolumesResponse, c.NextControllerListVolumesErr
|
||||
}
|
||||
|
||||
func (c *Client) NodeGetCapabilities(ctx context.Context) (*csi.NodeCapabilitySet, error) {
|
||||
c.Mu.Lock()
|
||||
defer c.Mu.Unlock()
|
||||
|
||||
@@ -45,6 +45,18 @@ type CSIPlugin interface {
|
||||
// supports the requested capability.
|
||||
ControllerValidateCapabilities(ctx context.Context, req *ControllerValidateVolumeRequest, opts ...grpc.CallOption) error
|
||||
|
||||
// ControllerCreateVolume is used to create a remote volume in the
|
||||
// external storage provider
|
||||
ControllerCreateVolume(ctx context.Context, req *ControllerCreateVolumeRequest, opts ...grpc.CallOption) (*ControllerCreateVolumeResponse, error)
|
||||
|
||||
// ControllerDeleteVolume is used to delete a remote volume in the
|
||||
// external storage provider
|
||||
ControllerDeleteVolume(ctx context.Context, req *ControllerDeleteVolumeRequest, opts ...grpc.CallOption) error
|
||||
|
||||
// ControllerListVolumes is used to list all volumes available in the
|
||||
// external storage provider
|
||||
ControllerListVolumes(ctx context.Context, req *ControllerListVolumesRequest, opts ...grpc.CallOption) (*ControllerListVolumesResponse, error)
|
||||
|
||||
// NodeGetCapabilities is used to return the available capabilities from the
|
||||
// Node Service.
|
||||
NodeGetCapabilities(ctx context.Context) (*NodeCapabilitySet, error)
|
||||
@@ -392,6 +404,259 @@ func (r *ControllerUnpublishVolumeRequest) Validate() error {
|
||||
|
||||
type ControllerUnpublishVolumeResponse struct{}
|
||||
|
||||
type ControllerCreateVolumeRequest struct {
|
||||
// note that Name is intentionally differentiated from both CSIVolume.ID
|
||||
// and ExternalVolumeID. This name is only a recommendation for the
|
||||
// storage provider, and many will discard this suggestion
|
||||
Name string
|
||||
CapacityRange *CapacityRange
|
||||
VolumeCapabilities []*VolumeCapability
|
||||
Parameters map[string]string
|
||||
Secrets structs.CSISecrets
|
||||
ContentSource *VolumeContentSource
|
||||
AccessibilityRequirements *TopologyRequirement
|
||||
}
|
||||
|
||||
func (r *ControllerCreateVolumeRequest) ToCSIRepresentation() *csipbv1.CreateVolumeRequest {
|
||||
if r == nil {
|
||||
return nil
|
||||
}
|
||||
caps := make([]*csipbv1.VolumeCapability, 0, len(r.VolumeCapabilities))
|
||||
for _, cap := range r.VolumeCapabilities {
|
||||
caps = append(caps, cap.ToCSIRepresentation())
|
||||
}
|
||||
req := &csipbv1.CreateVolumeRequest{
|
||||
Name: r.Name,
|
||||
CapacityRange: r.CapacityRange.ToCSIRepresentation(),
|
||||
VolumeCapabilities: caps,
|
||||
Parameters: r.Parameters,
|
||||
Secrets: r.Secrets,
|
||||
VolumeContentSource: r.ContentSource.ToCSIRepresentation(),
|
||||
AccessibilityRequirements: r.AccessibilityRequirements.ToCSIRepresentation(),
|
||||
}
|
||||
|
||||
return req
|
||||
}
|
||||
|
||||
func (r *ControllerCreateVolumeRequest) Validate() error {
|
||||
if r.Name == "" {
|
||||
return errors.New("missing Name")
|
||||
}
|
||||
if r.VolumeCapabilities == nil {
|
||||
return errors.New("missing VolumeCapabilities")
|
||||
}
|
||||
if r.CapacityRange != nil {
|
||||
if r.CapacityRange.LimitBytes == 0 && r.CapacityRange.RequiredBytes == 0 {
|
||||
return errors.New(
|
||||
"one of LimitBytes or RequiredBytes must be set if CapacityRange is set")
|
||||
}
|
||||
if r.CapacityRange.LimitBytes < r.CapacityRange.RequiredBytes {
|
||||
return errors.New("LimitBytes cannot be less than RequiredBytes")
|
||||
}
|
||||
}
|
||||
if r.ContentSource != nil {
|
||||
if r.ContentSource.CloneID != "" && r.ContentSource.SnapshotID != "" {
|
||||
return errors.New(
|
||||
"one of SnapshotID or CloneID must be set if ContentSource is set")
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// VolumeContentSource is snapshot or volume that the plugin will use to
|
||||
// create the new volume. At most one of these fields can be set, but nil (and
|
||||
// not an empty struct) is expected by CSI plugins if neither field is set.
|
||||
type VolumeContentSource struct {
|
||||
SnapshotID string
|
||||
CloneID string
|
||||
}
|
||||
|
||||
func (vcr *VolumeContentSource) ToCSIRepresentation() *csipbv1.VolumeContentSource {
|
||||
if vcr == nil {
|
||||
return nil
|
||||
}
|
||||
if vcr.CloneID != "" {
|
||||
return &csipbv1.VolumeContentSource{
|
||||
Type: &csipbv1.VolumeContentSource_Volume{
|
||||
Volume: &csipbv1.VolumeContentSource_VolumeSource{
|
||||
VolumeId: vcr.CloneID,
|
||||
},
|
||||
},
|
||||
}
|
||||
} else if vcr.SnapshotID != "" {
|
||||
return &csipbv1.VolumeContentSource{
|
||||
Type: &csipbv1.VolumeContentSource_Snapshot{
|
||||
Snapshot: &csipbv1.VolumeContentSource_SnapshotSource{
|
||||
SnapshotId: vcr.SnapshotID,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
// Nomad's RPCs will hand us an empty struct, not nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func newVolumeContentSource(src *csipbv1.VolumeContentSource) *VolumeContentSource {
|
||||
return &VolumeContentSource{
|
||||
SnapshotID: src.GetSnapshot().GetSnapshotId(),
|
||||
CloneID: src.GetVolume().GetVolumeId(),
|
||||
}
|
||||
}
|
||||
|
||||
type TopologyRequirement struct {
|
||||
Requisite []*Topology
|
||||
Preferred []*Topology
|
||||
}
|
||||
|
||||
func (tr *TopologyRequirement) ToCSIRepresentation() *csipbv1.TopologyRequirement {
|
||||
if tr == nil {
|
||||
return nil
|
||||
}
|
||||
result := &csipbv1.TopologyRequirement{
|
||||
Requisite: []*csipbv1.Topology{},
|
||||
Preferred: []*csipbv1.Topology{},
|
||||
}
|
||||
for _, topo := range tr.Requisite {
|
||||
result.Requisite = append(result.Requisite,
|
||||
&csipbv1.Topology{Segments: topo.Segments})
|
||||
}
|
||||
for _, topo := range tr.Preferred {
|
||||
result.Preferred = append(result.Preferred,
|
||||
&csipbv1.Topology{Segments: topo.Segments})
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
func newTopologies(src []*csipbv1.Topology) []*Topology {
|
||||
t := []*Topology{}
|
||||
for _, topo := range src {
|
||||
t = append(t, &Topology{Segments: topo.Segments})
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
type ControllerCreateVolumeResponse struct {
|
||||
Volume *Volume
|
||||
}
|
||||
|
||||
func NewCreateVolumeResponse(resp *csipbv1.CreateVolumeResponse) *ControllerCreateVolumeResponse {
|
||||
vol := resp.GetVolume()
|
||||
return &ControllerCreateVolumeResponse{Volume: &Volume{
|
||||
CapacityBytes: vol.GetCapacityBytes(),
|
||||
ExternalVolumeID: vol.GetVolumeId(),
|
||||
VolumeContext: vol.GetVolumeContext(),
|
||||
ContentSource: newVolumeContentSource(vol.GetContentSource()),
|
||||
}}
|
||||
}
|
||||
|
||||
type Volume struct {
|
||||
CapacityBytes int64
|
||||
|
||||
// this is differentiated from VolumeID so as not to create confusion
|
||||
// between the Nomad CSIVolume.ID and the storage provider's ID.
|
||||
ExternalVolumeID string
|
||||
VolumeContext map[string]string
|
||||
ContentSource *VolumeContentSource
|
||||
AccessibleTopology []*Topology
|
||||
}
|
||||
|
||||
type ControllerDeleteVolumeRequest struct {
|
||||
ExternalVolumeID string
|
||||
Secrets structs.CSISecrets
|
||||
}
|
||||
|
||||
func (r *ControllerDeleteVolumeRequest) ToCSIRepresentation() *csipbv1.DeleteVolumeRequest {
|
||||
if r == nil {
|
||||
return nil
|
||||
}
|
||||
return &csipbv1.DeleteVolumeRequest{
|
||||
VolumeId: r.ExternalVolumeID,
|
||||
Secrets: r.Secrets,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ControllerDeleteVolumeRequest) Validate() error {
|
||||
if r.ExternalVolumeID == "" {
|
||||
return errors.New("missing ExternalVolumeID")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type ControllerListVolumesRequest struct {
|
||||
MaxEntries int32
|
||||
StartingToken string
|
||||
}
|
||||
|
||||
func (r *ControllerListVolumesRequest) ToCSIRepresentation() *csipbv1.ListVolumesRequest {
|
||||
if r == nil {
|
||||
return nil
|
||||
}
|
||||
return &csipbv1.ListVolumesRequest{
|
||||
MaxEntries: r.MaxEntries,
|
||||
StartingToken: r.StartingToken,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *ControllerListVolumesRequest) Validate() error {
|
||||
if r.MaxEntries < 0 {
|
||||
return errors.New("MaxEntries cannot be negative")
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type ControllerListVolumesResponse struct {
|
||||
Entries []*ListVolumesResponse_Entry
|
||||
NextToken string
|
||||
}
|
||||
|
||||
func NewListVolumesResponse(resp *csipbv1.ListVolumesResponse) *ControllerListVolumesResponse {
|
||||
if resp == nil {
|
||||
return &ControllerListVolumesResponse{}
|
||||
}
|
||||
entries := []*ListVolumesResponse_Entry{}
|
||||
if resp.Entries != nil {
|
||||
for _, entry := range resp.Entries {
|
||||
vol := entry.GetVolume()
|
||||
status := entry.GetStatus()
|
||||
entries = append(entries, &ListVolumesResponse_Entry{
|
||||
Volume: &Volume{
|
||||
CapacityBytes: vol.CapacityBytes,
|
||||
ExternalVolumeID: vol.VolumeId,
|
||||
VolumeContext: vol.VolumeContext,
|
||||
ContentSource: newVolumeContentSource(vol.ContentSource),
|
||||
AccessibleTopology: newTopologies(vol.AccessibleTopology),
|
||||
},
|
||||
Status: &ListVolumesResponse_VolumeStatus{
|
||||
PublishedNodeIds: status.GetPublishedNodeIds(),
|
||||
VolumeCondition: &VolumeCondition{
|
||||
Abnormal: status.GetVolumeCondition().GetAbnormal(),
|
||||
Message: status.GetVolumeCondition().GetMessage(),
|
||||
},
|
||||
},
|
||||
})
|
||||
}
|
||||
}
|
||||
return &ControllerListVolumesResponse{
|
||||
Entries: entries,
|
||||
NextToken: resp.NextToken,
|
||||
}
|
||||
}
|
||||
|
||||
type ListVolumesResponse_Entry struct {
|
||||
Volume *Volume
|
||||
Status *ListVolumesResponse_VolumeStatus
|
||||
}
|
||||
|
||||
type ListVolumesResponse_VolumeStatus struct {
|
||||
PublishedNodeIds []string
|
||||
VolumeCondition *VolumeCondition
|
||||
}
|
||||
|
||||
type VolumeCondition struct {
|
||||
Abnormal bool
|
||||
Message string
|
||||
}
|
||||
|
||||
type NodeCapabilitySet struct {
|
||||
HasStageUnstageVolume bool
|
||||
}
|
||||
@@ -531,3 +796,18 @@ func (c *VolumeCapability) ToCSIRepresentation() *csipbv1.VolumeCapability {
|
||||
|
||||
return vc
|
||||
}
|
||||
|
||||
type CapacityRange struct {
|
||||
RequiredBytes int64
|
||||
LimitBytes int64
|
||||
}
|
||||
|
||||
func (c *CapacityRange) ToCSIRepresentation() *csipbv1.CapacityRange {
|
||||
if c == nil {
|
||||
return nil
|
||||
}
|
||||
return &csipbv1.CapacityRange{
|
||||
RequiredBytes: c.RequiredBytes,
|
||||
LimitBytes: c.LimitBytes,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package testing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
|
||||
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
|
||||
"google.golang.org/grpc"
|
||||
@@ -49,6 +50,9 @@ type ControllerClient struct {
|
||||
NextPublishVolumeResponse *csipbv1.ControllerPublishVolumeResponse
|
||||
NextUnpublishVolumeResponse *csipbv1.ControllerUnpublishVolumeResponse
|
||||
NextValidateVolumeCapabilitiesResponse *csipbv1.ValidateVolumeCapabilitiesResponse
|
||||
NextCreateVolumeResponse *csipbv1.CreateVolumeResponse
|
||||
NextDeleteVolumeResponse *csipbv1.DeleteVolumeResponse
|
||||
NextListVolumesResponse *csipbv1.ListVolumesResponse
|
||||
}
|
||||
|
||||
// NewControllerClient returns a new ControllerClient
|
||||
@@ -62,6 +66,9 @@ func (f *ControllerClient) Reset() {
|
||||
f.NextPublishVolumeResponse = nil
|
||||
f.NextUnpublishVolumeResponse = nil
|
||||
f.NextValidateVolumeCapabilitiesResponse = nil
|
||||
f.NextCreateVolumeResponse = nil
|
||||
f.NextDeleteVolumeResponse = nil
|
||||
f.NextListVolumesResponse = nil
|
||||
}
|
||||
|
||||
func (c *ControllerClient) ControllerGetCapabilities(ctx context.Context, in *csipbv1.ControllerGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipbv1.ControllerGetCapabilitiesResponse, error) {
|
||||
@@ -80,6 +87,29 @@ func (c *ControllerClient) ValidateVolumeCapabilities(ctx context.Context, in *c
|
||||
return c.NextValidateVolumeCapabilitiesResponse, c.NextErr
|
||||
}
|
||||
|
||||
func (c *ControllerClient) CreateVolume(ctx context.Context, in *csipbv1.CreateVolumeRequest, opts ...grpc.CallOption) (*csipbv1.CreateVolumeResponse, error) {
|
||||
if in.VolumeContentSource != nil {
|
||||
if in.VolumeContentSource.Type == nil || (in.VolumeContentSource.Type ==
|
||||
&csipbv1.VolumeContentSource_Volume{
|
||||
Volume: &csipbv1.VolumeContentSource_VolumeSource{VolumeId: ""},
|
||||
}) || (in.VolumeContentSource.Type ==
|
||||
&csipbv1.VolumeContentSource_Snapshot{
|
||||
Snapshot: &csipbv1.VolumeContentSource_SnapshotSource{SnapshotId: ""},
|
||||
}) {
|
||||
return nil, fmt.Errorf("empty content source should be nil")
|
||||
}
|
||||
}
|
||||
return c.NextCreateVolumeResponse, c.NextErr
|
||||
}
|
||||
|
||||
func (c *ControllerClient) DeleteVolume(ctx context.Context, in *csipbv1.DeleteVolumeRequest, opts ...grpc.CallOption) (*csipbv1.DeleteVolumeResponse, error) {
|
||||
return c.NextDeleteVolumeResponse, c.NextErr
|
||||
}
|
||||
|
||||
func (c *ControllerClient) ListVolumes(ctx context.Context, in *csipbv1.ListVolumesRequest, opts ...grpc.CallOption) (*csipbv1.ListVolumesResponse, error) {
|
||||
return c.NextListVolumesResponse, c.NextErr
|
||||
}
|
||||
|
||||
// NodeClient is a CSI Node client used for testing
|
||||
type NodeClient struct {
|
||||
NextErr error
|
||||
|
||||
Reference in New Issue
Block a user