csi: add grpc retries to client controller RPCs (#7549)

The CSI Specification defines various gRPC Errors and how they may be retried. After auditing all our CSI RPC calls in #6863, this changeset:

* adds retries and backoffs to the where they were needed but not implemented
* annotates those CSI RPCs that do not need retries so that we don't wonder whether it's been left off accidentally
* added a timeout and cancellation context to the `Probe` call, which didn't have one.
This commit is contained in:
Tim Gross
2020-03-30 16:26:03 -04:00
committed by GitHub
parent a86e575670
commit ffa13adf90
5 changed files with 59 additions and 23 deletions

View File

@@ -6,6 +6,7 @@ import (
"time"
metrics "github.com/armon/go-metrics"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/plugins/csi"
@@ -55,7 +56,13 @@ func (c *CSIController) ValidateVolume(req *structs.ClientCSIControllerValidateV
ctx, cancelFn := c.requestContext()
defer cancelFn()
return plugin.ControllerValidateCapabilties(ctx, req.VolumeID, caps)
// CSI ValidateVolumeCapabilities errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
return plugin.ControllerValidateCapabilities(ctx, req.VolumeID, caps,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
}
// AttachVolume is used to attach a volume from a CSI Cluster to
@@ -95,7 +102,12 @@ func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolum
// Submit the request for a volume to the CSI Plugin.
ctx, cancelFn := c.requestContext()
defer cancelFn()
cresp, err := plugin.ControllerPublishVolume(ctx, csiReq)
// CSI ControllerPublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
cresp, err := plugin.ControllerPublishVolume(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
return err
}
@@ -132,7 +144,12 @@ func (c *CSIController) DetachVolume(req *structs.ClientCSIControllerDetachVolum
// Submit the request for a volume to the CSI Plugin.
ctx, cancelFn := c.requestContext()
defer cancelFn()
_, err = plugin.ControllerUnpublishVolume(ctx, csiReq)
// CSI ControllerUnpublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
_, err = plugin.ControllerUnpublishVolume(ctx, csiReq,
grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)))
if err != nil {
return err
}

View File

@@ -160,11 +160,8 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume,
return err
}
// We currently treat all explicit CSI NodeStageVolume errors (aside from timeouts, codes.ResourceExhausted, and codes.Unavailable)
// as fatal.
// In the future, we can provide more useful error messages based on
// different types of error. For error documentation see:
// https://github.com/container-storage-interface/spec/blob/4731db0e0bc53238b93850f43ab05d9355df0fd9/spec.md#nodestagevolume-errors
// CSI NodeStageVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
return v.plugin.NodeStageVolume(ctx,
vol.ID,
publishContext,
@@ -199,6 +196,8 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum
return nil, err
}
// CSI NodePublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
err = v.plugin.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
VolumeID: vol.RemoteID(),
PublishContext: publishContext,
@@ -249,6 +248,9 @@ func (v *volumeManager) unstageVolume(ctx context.Context, vol *structs.CSIVolum
logger := hclog.FromContext(ctx)
logger.Trace("Unstaging volume")
stagingPath := v.stagingDirForVolume(v.containerMountPoint, vol, usage)
// CSI NodeUnstageVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
return v.plugin.NodeUnstageVolume(ctx,
vol.ID,
stagingPath,
@@ -274,6 +276,8 @@ func combineErrors(maybeErrs ...error) error {
func (v *volumeManager) unpublishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) error {
pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, vol, alloc, usage)
// CSI NodeUnpublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
rpcErr := v.plugin.NodeUnpublishVolume(ctx, vol.ID, pluginTargetPath,
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
grpc_retry.WithMax(3),

View File

@@ -129,7 +129,11 @@ func newGrpcConn(addr string, logger hclog.Logger) (*grpc.ClientConn, error) {
// PluginInfo describes the type and version of a plugin as required by the nomad
// base.BasePlugin interface.
func (c *client) PluginInfo() (*base.PluginInfoResponse, error) {
name, version, err := c.PluginGetInfo(context.TODO())
// note: no grpc retries needed here, as this is called in
// fingerprinting and will get retried by the caller.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
name, version, err := c.PluginGetInfo(ctx)
if err != nil {
return nil, err
}
@@ -155,6 +159,7 @@ func (c *client) SetConfig(_ *base.Config) error {
}
func (c *client) PluginProbe(ctx context.Context) (bool, error) {
// note: no grpc retries should be done here
req, err := c.identityClient.Probe(ctx, &csipbv1.ProbeRequest{})
if err != nil {
return false, err
@@ -205,7 +210,10 @@ func (c *client) PluginGetCapabilities(ctx context.Context) (*PluginCapabilitySe
return nil, fmt.Errorf("Client not initialized")
}
resp, err := c.identityClient.GetPluginCapabilities(ctx, &csipbv1.GetPluginCapabilitiesRequest{})
// note: no grpc retries needed here, as this is called in
// fingerprinting and will get retried by the caller
resp, err := c.identityClient.GetPluginCapabilities(ctx,
&csipbv1.GetPluginCapabilitiesRequest{})
if err != nil {
return nil, err
}
@@ -225,7 +233,10 @@ func (c *client) ControllerGetCapabilities(ctx context.Context) (*ControllerCapa
return nil, fmt.Errorf("controllerClient not initialized")
}
resp, err := c.controllerClient.ControllerGetCapabilities(ctx, &csipbv1.ControllerGetCapabilitiesRequest{})
// note: no grpc retries needed here, as this is called in
// fingerprinting and will get retried by the caller
resp, err := c.controllerClient.ControllerGetCapabilities(ctx,
&csipbv1.ControllerGetCapabilitiesRequest{})
if err != nil {
return nil, err
}
@@ -233,7 +244,7 @@ func (c *client) ControllerGetCapabilities(ctx context.Context) (*ControllerCapa
return NewControllerCapabilitySet(resp), nil
}
func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest) (*ControllerPublishVolumeResponse, error) {
func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*ControllerPublishVolumeResponse, error) {
if c == nil {
return nil, fmt.Errorf("Client not initialized")
}
@@ -247,7 +258,7 @@ func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPub
}
pbrequest := req.ToCSIRepresentation()
resp, err := c.controllerClient.ControllerPublishVolume(ctx, pbrequest)
resp, err := c.controllerClient.ControllerPublishVolume(ctx, pbrequest, opts...)
if err != nil {
return nil, err
}
@@ -257,7 +268,7 @@ func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPub
}, nil
}
func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error) {
func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*ControllerUnpublishVolumeResponse, error) {
if c == nil {
return nil, fmt.Errorf("Client not initialized")
}
@@ -270,7 +281,7 @@ func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerU
}
upbrequest := req.ToCSIRepresentation()
_, err = c.controllerClient.ControllerUnpublishVolume(ctx, upbrequest)
_, err = c.controllerClient.ControllerUnpublishVolume(ctx, upbrequest, opts...)
if err != nil {
return nil, err
}
@@ -278,7 +289,7 @@ func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerU
return &ControllerUnpublishVolumeResponse{}, nil
}
func (c *client) ControllerValidateCapabilties(ctx context.Context, volumeID string, capabilities *VolumeCapability) error {
func (c *client) ControllerValidateCapabilities(ctx context.Context, volumeID string, capabilities *VolumeCapability, opts ...grpc.CallOption) error {
if c == nil {
return fmt.Errorf("Client not initialized")
}
@@ -301,7 +312,7 @@ func (c *client) ControllerValidateCapabilties(ctx context.Context, volumeID str
},
}
resp, err := c.controllerClient.ValidateVolumeCapabilities(ctx, req)
resp, err := c.controllerClient.ValidateVolumeCapabilities(ctx, req, opts...)
if err != nil {
return err
}
@@ -329,6 +340,8 @@ func (c *client) NodeGetCapabilities(ctx context.Context) (*NodeCapabilitySet, e
return nil, fmt.Errorf("Client not initialized")
}
// note: no grpc retries needed here, as this is called in
// fingerprinting and will get retried by the caller
resp, err := c.nodeClient.NodeGetCapabilities(ctx, &csipbv1.NodeGetCapabilitiesRequest{})
if err != nil {
return nil, err
@@ -347,6 +360,8 @@ func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error)
result := &NodeGetInfoResponse{}
// note: no grpc retries needed here, as this is called in
// fingerprinting and will get retried by the caller
resp, err := c.nodeClient.NodeGetInfo(ctx, &csipbv1.NodeGetInfoRequest{})
if err != nil {
return nil, err

View File

@@ -140,7 +140,7 @@ func (c *Client) ControllerGetCapabilities(ctx context.Context) (*csi.Controller
}
// ControllerPublishVolume is used to attach a remote volume to a node
func (c *Client) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) {
func (c *Client) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*csi.ControllerPublishVolumeResponse, error) {
c.Mu.Lock()
defer c.Mu.Unlock()
@@ -150,7 +150,7 @@ func (c *Client) ControllerPublishVolume(ctx context.Context, req *csi.Controlle
}
// ControllerUnpublishVolume is used to attach a remote volume to a node
func (c *Client) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) {
func (c *Client) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*csi.ControllerUnpublishVolumeResponse, error) {
c.Mu.Lock()
defer c.Mu.Unlock()
@@ -159,7 +159,7 @@ func (c *Client) ControllerUnpublishVolume(ctx context.Context, req *csi.Control
return c.NextControllerUnpublishVolumeResponse, c.NextControllerUnpublishVolumeErr
}
func (c *Client) ControllerValidateCapabilties(ctx context.Context, volumeID string, capabilities *csi.VolumeCapability) error {
func (c *Client) ControllerValidateCapabilities(ctx context.Context, volumeID string, capabilities *csi.VolumeCapability, opts ...grpc.CallOption) error {
c.Mu.Lock()
defer c.Mu.Unlock()

View File

@@ -36,14 +36,14 @@ type CSIPlugin interface {
ControllerGetCapabilities(ctx context.Context) (*ControllerCapabilitySet, error)
// ControllerPublishVolume is used to attach a remote volume to a cluster node.
ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest) (*ControllerPublishVolumeResponse, error)
ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*ControllerPublishVolumeResponse, error)
// ControllerUnpublishVolume is used to deattach a remote volume from a cluster node.
ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error)
ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*ControllerUnpublishVolumeResponse, error)
// ControllerValidateCapabilities is used to validate that a volume exists and
// supports the requested capability.
ControllerValidateCapabilties(ctx context.Context, volumeID string, capabilities *VolumeCapability) error
ControllerValidateCapabilities(ctx context.Context, volumeID string, capabilities *VolumeCapability, opts ...grpc.CallOption) error
// NodeGetCapabilities is used to return the available capabilities from the
// Node Service.