mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
csi: Add NodePublishVolume RPCs
This commit is contained in:
committed by
Tim Gross
parent
639bcdf813
commit
9f73f09f57
@@ -69,6 +69,7 @@ type CSINodeClient interface {
|
||||
NodeGetInfo(ctx context.Context, in *csipbv1.NodeGetInfoRequest, opts ...grpc.CallOption) (*csipbv1.NodeGetInfoResponse, error)
|
||||
NodeStageVolume(ctx context.Context, in *csipbv1.NodeStageVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeStageVolumeResponse, error)
|
||||
NodeUnstageVolume(ctx context.Context, in *csipbv1.NodeUnstageVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnstageVolumeResponse, error)
|
||||
NodePublishVolume(ctx context.Context, in *csipbv1.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodePublishVolumeResponse, error)
|
||||
}
|
||||
|
||||
type client struct {
|
||||
@@ -334,7 +335,6 @@ func (c *client) NodeUnstageVolume(ctx context.Context, volumeID string, staging
|
||||
if c.nodeClient == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
}
|
||||
|
||||
// These errors should not be returned during production use but exist as aids
|
||||
// during Nomad Development
|
||||
if volumeID == "" {
|
||||
@@ -354,3 +354,21 @@ func (c *client) NodeUnstageVolume(ctx context.Context, volumeID string, staging
|
||||
_, err := c.nodeClient.NodeUnstageVolume(ctx, req)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *client) NodePublishVolume(ctx context.Context, req *NodePublishVolumeRequest) error {
|
||||
if c == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
}
|
||||
if c.nodeClient == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
}
|
||||
|
||||
if err := req.Validate(); err != nil {
|
||||
return fmt.Errorf("validation error: %v", err)
|
||||
}
|
||||
|
||||
// NodePublishVolume's response contains no extra data. If err == nil, we were
|
||||
// successful.
|
||||
_, err := c.nodeClient.NodePublishVolume(ctx, req.ToCSIRepresentation())
|
||||
return err
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package csi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
@@ -482,3 +483,59 @@ func TestClient_RPC_NodeUnstageVolume(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestClient_RPC_NodePublishVolume(t *testing.T) {
|
||||
cases := []struct {
|
||||
Name string
|
||||
Request *NodePublishVolumeRequest
|
||||
ResponseErr error
|
||||
Response *csipbv1.NodePublishVolumeResponse
|
||||
ExpectedErr error
|
||||
}{
|
||||
{
|
||||
Name: "handles underlying grpc errors",
|
||||
Request: &NodePublishVolumeRequest{
|
||||
VolumeID: "foo",
|
||||
TargetPath: "/dev/null",
|
||||
VolumeCapability: &VolumeCapability{},
|
||||
},
|
||||
ResponseErr: fmt.Errorf("some grpc error"),
|
||||
ExpectedErr: fmt.Errorf("some grpc error"),
|
||||
},
|
||||
{
|
||||
Name: "handles success",
|
||||
Request: &NodePublishVolumeRequest{
|
||||
VolumeID: "foo",
|
||||
TargetPath: "/dev/null",
|
||||
VolumeCapability: &VolumeCapability{},
|
||||
},
|
||||
ResponseErr: nil,
|
||||
ExpectedErr: nil,
|
||||
},
|
||||
{
|
||||
Name: "Performs validation of the publish volume request",
|
||||
Request: &NodePublishVolumeRequest{
|
||||
VolumeID: "",
|
||||
},
|
||||
ResponseErr: nil,
|
||||
ExpectedErr: errors.New("missing VolumeID"),
|
||||
},
|
||||
}
|
||||
|
||||
for _, c := range cases {
|
||||
t.Run(c.Name, func(t *testing.T) {
|
||||
_, _, nc, client := newTestClient()
|
||||
defer client.Close()
|
||||
|
||||
nc.NextErr = c.ResponseErr
|
||||
nc.NextPublishVolumeResponse = c.Response
|
||||
|
||||
err := client.NodePublishVolume(context.TODO(), c.Request)
|
||||
if c.ExpectedErr != nil {
|
||||
require.Error(t, c.ExpectedErr, err)
|
||||
} else {
|
||||
require.Nil(t, err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
@@ -56,6 +56,9 @@ type Client struct {
|
||||
|
||||
NextNodeUnstageVolumeErr error
|
||||
NodeUnstageVolumeCallCount int64
|
||||
|
||||
NextNodePublishVolumeErr error
|
||||
NodePublishVolumeCallCount int64
|
||||
}
|
||||
|
||||
// PluginInfo describes the type and version of a plugin.
|
||||
@@ -178,6 +181,15 @@ func (c *Client) NodeUnstageVolume(ctx context.Context, volumeID string, staging
|
||||
return c.NextNodeUnstageVolumeErr
|
||||
}
|
||||
|
||||
func (c *Client) NodePublishVolume(ctx context.Context, req *csi.NodePublishVolumeRequest) error {
|
||||
c.Mu.Lock()
|
||||
defer c.Mu.Unlock()
|
||||
|
||||
c.NodePublishVolumeCallCount++
|
||||
|
||||
return c.NextNodePublishVolumeErr
|
||||
}
|
||||
|
||||
// Shutdown the client and ensure any connections are cleaned up.
|
||||
func (c *Client) Close() error {
|
||||
return nil
|
||||
|
||||
@@ -2,6 +2,7 @@ package csi
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
csipbv1 "github.com/container-storage-interface/spec/lib/go/csi"
|
||||
@@ -54,10 +55,79 @@ type CSIPlugin interface {
|
||||
// If err == nil, the response should be assumed to be successful.
|
||||
NodeUnstageVolume(ctx context.Context, volumeID string, stagingTargetPath string) error
|
||||
|
||||
// NodePublishVolume is used to prepare a volume for use by an allocation.
|
||||
// if err == nil the response should be assumed to be successful.
|
||||
NodePublishVolume(ctx context.Context, req *NodePublishVolumeRequest) error
|
||||
|
||||
// Shutdown the client and ensure any connections are cleaned up.
|
||||
Close() error
|
||||
}
|
||||
|
||||
type NodePublishVolumeRequest struct {
|
||||
// The ID of the volume to publish.
|
||||
VolumeID string
|
||||
|
||||
// If the volume was attached via a call to `ControllerPublishVolume` then
|
||||
// we need to provide the returned PublishContext here.
|
||||
PublishContext map[string]string
|
||||
|
||||
// The path to which the volume was staged by `NodeStageVolume`.
|
||||
// It MUST be an absolute path in the root filesystem of the process
|
||||
// serving this request.
|
||||
// E.g {the plugins internal mount path}/staging/volumeid/...
|
||||
//
|
||||
// It MUST be set if the Node Plugin implements the
|
||||
// `STAGE_UNSTAGE_VOLUME` node capability.
|
||||
StagingTargetPath string
|
||||
|
||||
// The path to which the volume will be published.
|
||||
// It MUST be an absolute path in the root filesystem of the process serving this
|
||||
// request.
|
||||
// E.g {the plugins internal mount path}/per-alloc/allocid/volumeid/...
|
||||
//
|
||||
// The CO SHALL ensure uniqueness of target_path per volume.
|
||||
// The CO SHALL ensure that the parent directory of this path exists
|
||||
// and that the process serving the request has `read` and `write`
|
||||
// permissions to that parent directory.
|
||||
TargetPath string
|
||||
|
||||
// Volume capability describing how the CO intends to use this volume.
|
||||
VolumeCapability *VolumeCapability
|
||||
|
||||
Readonly bool
|
||||
|
||||
// Reserved for future use.
|
||||
Secrets map[string]string
|
||||
}
|
||||
|
||||
func (r *NodePublishVolumeRequest) ToCSIRepresentation() *csipbv1.NodePublishVolumeRequest {
|
||||
return &csipbv1.NodePublishVolumeRequest{
|
||||
VolumeId: r.VolumeID,
|
||||
PublishContext: r.PublishContext,
|
||||
StagingTargetPath: r.StagingTargetPath,
|
||||
TargetPath: r.TargetPath,
|
||||
VolumeCapability: r.VolumeCapability.ToCSIRepresentation(),
|
||||
Readonly: r.Readonly,
|
||||
Secrets: r.Secrets,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *NodePublishVolumeRequest) Validate() error {
|
||||
if r.VolumeID == "" {
|
||||
return errors.New("missing VolumeID")
|
||||
}
|
||||
|
||||
if r.TargetPath == "" {
|
||||
return errors.New("missing TargetPath")
|
||||
}
|
||||
|
||||
if r.VolumeCapability == nil {
|
||||
return errors.New("missing VolumeCapabilities")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type PluginCapabilitySet struct {
|
||||
hasControllerService bool
|
||||
hasTopologies bool
|
||||
|
||||
@@ -85,9 +85,10 @@ type NodeClient struct {
|
||||
NextGetInfoResponse *csipbv1.NodeGetInfoResponse
|
||||
NextStageVolumeResponse *csipbv1.NodeStageVolumeResponse
|
||||
NextUnstageVolumeResponse *csipbv1.NodeUnstageVolumeResponse
|
||||
NextPublishVolumeResponse *csipbv1.NodePublishVolumeResponse
|
||||
}
|
||||
|
||||
// NewNodeClient returns a new ControllerClient
|
||||
// NewNodeClient returns a new stub NodeClient
|
||||
func NewNodeClient() *NodeClient {
|
||||
return &NodeClient{}
|
||||
}
|
||||
@@ -98,6 +99,7 @@ func (f *NodeClient) Reset() {
|
||||
f.NextGetInfoResponse = nil
|
||||
f.NextStageVolumeResponse = nil
|
||||
f.NextUnstageVolumeResponse = nil
|
||||
f.NextPublishVolumeResponse = nil
|
||||
}
|
||||
|
||||
func (c *NodeClient) NodeGetCapabilities(ctx context.Context, in *csipbv1.NodeGetCapabilitiesRequest, opts ...grpc.CallOption) (*csipbv1.NodeGetCapabilitiesResponse, error) {
|
||||
@@ -115,3 +117,7 @@ func (c *NodeClient) NodeStageVolume(ctx context.Context, in *csipbv1.NodeStageV
|
||||
func (c *NodeClient) NodeUnstageVolume(ctx context.Context, in *csipbv1.NodeUnstageVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodeUnstageVolumeResponse, error) {
|
||||
return c.NextUnstageVolumeResponse, c.NextErr
|
||||
}
|
||||
|
||||
func (c *NodeClient) NodePublishVolume(ctx context.Context, in *csipbv1.NodePublishVolumeRequest, opts ...grpc.CallOption) (*csipbv1.NodePublishVolumeResponse, error) {
|
||||
return c.NextPublishVolumeResponse, c.NextErr
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user