mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
csi: add VolumeContext to NodeStage/Publish RPCs (#8239)
In #7957 we added support for passing a volume context to the controller RPCs. This is an opaque map that's created by `CreateVolume` or, in Nomad's case, in the volume registration spec. However, we missed passing this field to the `NodeStage` and `NodePublish` RPC, which prevents certain plugins (such as MooseFS) from making node RPCs.
This commit is contained in:
@@ -163,14 +163,18 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume,
|
||||
return err
|
||||
}
|
||||
|
||||
req := &csi.NodeStageVolumeRequest{
|
||||
ExternalID: vol.RemoteID(),
|
||||
PublishContext: publishContext,
|
||||
StagingTargetPath: pluginStagingPath,
|
||||
VolumeCapability: capability,
|
||||
Secrets: vol.Secrets,
|
||||
VolumeContext: vol.Context,
|
||||
}
|
||||
|
||||
// CSI NodeStageVolume errors for timeout, codes.Unavailable and
|
||||
// codes.ResourceExhausted are retried; all other errors are fatal.
|
||||
return v.plugin.NodeStageVolume(ctx,
|
||||
vol.RemoteID(),
|
||||
publishContext,
|
||||
pluginStagingPath,
|
||||
capability,
|
||||
vol.Secrets,
|
||||
return v.plugin.NodeStageVolume(ctx, req,
|
||||
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
|
||||
grpc_retry.WithMax(3),
|
||||
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
|
||||
@@ -210,6 +214,7 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum
|
||||
VolumeCapability: capabilities,
|
||||
Readonly: usage.ReadOnly,
|
||||
Secrets: vol.Secrets,
|
||||
VolumeContext: vol.Context,
|
||||
},
|
||||
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
|
||||
grpc_retry.WithMax(3),
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
multierror "github.com/hashicorp/go-multierror"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/helper/grpc-middleware/logging"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/base"
|
||||
"github.com/hashicorp/nomad/plugins/shared/hclspec"
|
||||
"google.golang.org/grpc"
|
||||
@@ -496,46 +495,33 @@ func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error)
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (c *client) NodeStageVolume(ctx context.Context, volumeID string, publishContext map[string]string, stagingTargetPath string, capabilities *VolumeCapability, secrets structs.CSISecrets, opts ...grpc.CallOption) error {
|
||||
func (c *client) NodeStageVolume(ctx context.Context, req *NodeStageVolumeRequest, opts ...grpc.CallOption) error {
|
||||
if c == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
}
|
||||
if c.nodeClient == nil {
|
||||
return fmt.Errorf("Client not initialized")
|
||||
}
|
||||
|
||||
// These errors should not be returned during production use but exist as aids
|
||||
// during Nomad development
|
||||
if volumeID == "" {
|
||||
return fmt.Errorf("missing volumeID")
|
||||
}
|
||||
if stagingTargetPath == "" {
|
||||
return fmt.Errorf("missing stagingTargetPath")
|
||||
}
|
||||
|
||||
req := &csipbv1.NodeStageVolumeRequest{
|
||||
VolumeId: volumeID,
|
||||
PublishContext: publishContext,
|
||||
StagingTargetPath: stagingTargetPath,
|
||||
VolumeCapability: capabilities.ToCSIRepresentation(),
|
||||
Secrets: secrets,
|
||||
err := req.Validate()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// NodeStageVolume's response contains no extra data. If err == nil, we were
|
||||
// successful.
|
||||
_, err := c.nodeClient.NodeStageVolume(ctx, req, opts...)
|
||||
_, err = c.nodeClient.NodeStageVolume(ctx, req.ToCSIRepresentation(), opts...)
|
||||
if err != nil {
|
||||
code := status.Code(err)
|
||||
switch code {
|
||||
case codes.NotFound:
|
||||
err = fmt.Errorf("volume %q could not be found: %v", volumeID, err)
|
||||
err = fmt.Errorf("volume %q could not be found: %v", req.ExternalID, err)
|
||||
case codes.AlreadyExists:
|
||||
err = fmt.Errorf(
|
||||
"volume %q is already staged to %q but with incompatible capabilities for this request: %v",
|
||||
volumeID, stagingTargetPath, err)
|
||||
req.ExternalID, req.StagingTargetPath, err)
|
||||
case codes.FailedPrecondition:
|
||||
err = fmt.Errorf("volume %q is already published on another node and does not have MULTI_NODE volume capability: %v",
|
||||
volumeID, err)
|
||||
req.ExternalID, err)
|
||||
case codes.Internal:
|
||||
err = fmt.Errorf("node plugin returned an internal error, check the plugin allocation logs for more information: %v", err)
|
||||
}
|
||||
|
||||
@@ -628,8 +628,11 @@ func TestClient_RPC_NodeStageVolume(t *testing.T) {
|
||||
nc.NextErr = tc.ResponseErr
|
||||
nc.NextStageVolumeResponse = tc.Response
|
||||
|
||||
err := client.NodeStageVolume(context.TODO(), "foo", nil, "/path",
|
||||
&VolumeCapability{}, structs.CSISecrets{})
|
||||
err := client.NodeStageVolume(context.TODO(), &NodeStageVolumeRequest{
|
||||
ExternalID: "foo",
|
||||
StagingTargetPath: "/path",
|
||||
VolumeCapability: &VolumeCapability{},
|
||||
})
|
||||
if tc.ExpectedErr != nil {
|
||||
require.EqualError(t, err, tc.ExpectedErr.Error())
|
||||
} else {
|
||||
|
||||
@@ -8,7 +8,6 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/hashicorp/nomad/plugins/base"
|
||||
"github.com/hashicorp/nomad/plugins/csi"
|
||||
"github.com/hashicorp/nomad/plugins/shared/hclspec"
|
||||
@@ -192,7 +191,7 @@ func (c *Client) NodeGetInfo(ctx context.Context) (*csi.NodeGetInfoResponse, err
|
||||
// NodeStageVolume is used when a plugin has the STAGE_UNSTAGE volume capability
|
||||
// to prepare a volume for usage on a host. If err == nil, the response should
|
||||
// be assumed to be successful.
|
||||
func (c *Client) NodeStageVolume(ctx context.Context, volumeID string, publishContext map[string]string, stagingTargetPath string, capabilities *csi.VolumeCapability, secrets structs.CSISecrets, opts ...grpc.CallOption) error {
|
||||
func (c *Client) NodeStageVolume(ctx context.Context, req *csi.NodeStageVolumeRequest, opts ...grpc.CallOption) error {
|
||||
c.Mu.Lock()
|
||||
defer c.Mu.Unlock()
|
||||
|
||||
|
||||
@@ -56,7 +56,7 @@ type CSIPlugin interface {
|
||||
// NodeStageVolume is used when a plugin has the STAGE_UNSTAGE volume capability
|
||||
// to prepare a volume for usage on a host. If err == nil, the response should
|
||||
// be assumed to be successful.
|
||||
NodeStageVolume(ctx context.Context, volumeID string, publishContext map[string]string, stagingTargetPath string, capabilities *VolumeCapability, secrets structs.CSISecrets, opts ...grpc.CallOption) error
|
||||
NodeStageVolume(ctx context.Context, req *NodeStageVolumeRequest, opts ...grpc.CallOption) error
|
||||
|
||||
// NodeUnstageVolume is used when a plugin has the STAGE_UNSTAGE volume capability
|
||||
// to undo the work performed by NodeStageVolume. If a volume has been staged,
|
||||
@@ -114,6 +114,11 @@ type NodePublishVolumeRequest struct {
|
||||
// Secrets required by plugins to complete the node publish volume
|
||||
// request. This field is OPTIONAL.
|
||||
Secrets structs.CSISecrets
|
||||
|
||||
// Volume context as returned by SP in the CSI
|
||||
// CreateVolumeResponse.Volume.volume_context which we don't implement but
|
||||
// can be entered by hand in the volume spec. This field is OPTIONAL.
|
||||
VolumeContext map[string]string
|
||||
}
|
||||
|
||||
func (r *NodePublishVolumeRequest) ToCSIRepresentation() *csipbv1.NodePublishVolumeRequest {
|
||||
@@ -129,6 +134,7 @@ func (r *NodePublishVolumeRequest) ToCSIRepresentation() *csipbv1.NodePublishVol
|
||||
VolumeCapability: r.VolumeCapability.ToCSIRepresentation(),
|
||||
Readonly: r.Readonly,
|
||||
Secrets: r.Secrets,
|
||||
VolumeContext: r.VolumeContext,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,6 +154,69 @@ func (r *NodePublishVolumeRequest) Validate() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
type NodeStageVolumeRequest struct {
|
||||
// The external ID of the volume to stage.
|
||||
ExternalID string
|
||||
|
||||
// If the volume was attached via a call to `ControllerPublishVolume` then
|
||||
// we need to provide the returned PublishContext here.
|
||||
PublishContext map[string]string
|
||||
|
||||
// The path to which the volume MAY be staged. It MUST be an
|
||||
// absolute path in the root filesystem of the process serving this
|
||||
// request, and MUST be a directory. The CO SHALL ensure that there
|
||||
// is only one `staging_target_path` per volume. The CO SHALL ensure
|
||||
// that the path is directory and that the process serving the
|
||||
// request has `read` and `write` permission to that directory. The
|
||||
// CO SHALL be responsible for creating the directory if it does not
|
||||
// exist.
|
||||
// This is a REQUIRED field.
|
||||
StagingTargetPath string
|
||||
|
||||
// Volume capability describing how the CO intends to use this volume.
|
||||
VolumeCapability *VolumeCapability
|
||||
|
||||
// Secrets required by plugins to complete the node stage volume
|
||||
// request. This field is OPTIONAL.
|
||||
Secrets structs.CSISecrets
|
||||
|
||||
// Volume context as returned by SP in the CSI
|
||||
// CreateVolumeResponse.Volume.volume_context which we don't implement but
|
||||
// can be entered by hand in the volume spec. This field is OPTIONAL.
|
||||
VolumeContext map[string]string
|
||||
}
|
||||
|
||||
func (r *NodeStageVolumeRequest) ToCSIRepresentation() *csipbv1.NodeStageVolumeRequest {
|
||||
if r == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
return &csipbv1.NodeStageVolumeRequest{
|
||||
VolumeId: r.ExternalID,
|
||||
PublishContext: r.PublishContext,
|
||||
StagingTargetPath: r.StagingTargetPath,
|
||||
VolumeCapability: r.VolumeCapability.ToCSIRepresentation(),
|
||||
Secrets: r.Secrets,
|
||||
VolumeContext: r.VolumeContext,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *NodeStageVolumeRequest) Validate() error {
|
||||
if r.ExternalID == "" {
|
||||
return errors.New("missing volume ID")
|
||||
}
|
||||
|
||||
if r.StagingTargetPath == "" {
|
||||
return errors.New("missing StagingTargetPath")
|
||||
}
|
||||
|
||||
if r.VolumeCapability == nil {
|
||||
return errors.New("missing VolumeCapabilities")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
type PluginCapabilitySet struct {
|
||||
hasControllerService bool
|
||||
hasTopologies bool
|
||||
|
||||
Reference in New Issue
Block a user