diff --git a/client/csi_controller_endpoint.go b/client/csi_controller_endpoint.go index d1c25c3f0..c67bdd0cb 100644 --- a/client/csi_controller_endpoint.go +++ b/client/csi_controller_endpoint.go @@ -6,6 +6,7 @@ import ( "time" metrics "github.com/armon/go-metrics" + grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "github.com/hashicorp/nomad/client/dynamicplugins" "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/plugins/csi" @@ -55,7 +56,13 @@ func (c *CSIController) ValidateVolume(req *structs.ClientCSIControllerValidateV ctx, cancelFn := c.requestContext() defer cancelFn() - return plugin.ControllerValidateCapabilties(ctx, req.VolumeID, caps) + + // CSI ValidateVolumeCapabilities errors for timeout, codes.Unavailable and + // codes.ResourceExhausted are retried; all other errors are fatal. + return plugin.ControllerValidateCapabilities(ctx, req.VolumeID, caps, + grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout), + grpc_retry.WithMax(3), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond))) } // AttachVolume is used to attach a volume from a CSI Cluster to @@ -95,7 +102,12 @@ func (c *CSIController) AttachVolume(req *structs.ClientCSIControllerAttachVolum // Submit the request for a volume to the CSI Plugin. ctx, cancelFn := c.requestContext() defer cancelFn() - cresp, err := plugin.ControllerPublishVolume(ctx, csiReq) + // CSI ControllerPublishVolume errors for timeout, codes.Unavailable and + // codes.ResourceExhausted are retried; all other errors are fatal. + cresp, err := plugin.ControllerPublishVolume(ctx, csiReq, + grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout), + grpc_retry.WithMax(3), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond))) if err != nil { return err } @@ -132,7 +144,12 @@ func (c *CSIController) DetachVolume(req *structs.ClientCSIControllerDetachVolum // Submit the request for a volume to the CSI Plugin. ctx, cancelFn := c.requestContext() defer cancelFn() - _, err = plugin.ControllerUnpublishVolume(ctx, csiReq) + // CSI ControllerUnpublishVolume errors for timeout, codes.Unavailable and + // codes.ResourceExhausted are retried; all other errors are fatal. + _, err = plugin.ControllerUnpublishVolume(ctx, csiReq, + grpc_retry.WithPerRetryTimeout(CSIPluginRequestTimeout), + grpc_retry.WithMax(3), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond))) if err != nil { return err } diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index 6243af4b6..c6be8ad79 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -160,11 +160,8 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume, return err } - // We currently treat all explicit CSI NodeStageVolume errors (aside from timeouts, codes.ResourceExhausted, and codes.Unavailable) - // as fatal. - // In the future, we can provide more useful error messages based on - // different types of error. For error documentation see: - // https://github.com/container-storage-interface/spec/blob/4731db0e0bc53238b93850f43ab05d9355df0fd9/spec.md#nodestagevolume-errors + // CSI NodeStageVolume errors for timeout, codes.Unavailable and + // codes.ResourceExhausted are retried; all other errors are fatal. return v.plugin.NodeStageVolume(ctx, vol.ID, publishContext, @@ -199,6 +196,8 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum return nil, err } + // CSI NodePublishVolume errors for timeout, codes.Unavailable and + // codes.ResourceExhausted are retried; all other errors are fatal. err = v.plugin.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{ VolumeID: vol.RemoteID(), PublishContext: publishContext, @@ -249,6 +248,9 @@ func (v *volumeManager) unstageVolume(ctx context.Context, vol *structs.CSIVolum logger := hclog.FromContext(ctx) logger.Trace("Unstaging volume") stagingPath := v.stagingDirForVolume(v.containerMountPoint, vol, usage) + + // CSI NodeUnstageVolume errors for timeout, codes.Unavailable and + // codes.ResourceExhausted are retried; all other errors are fatal. return v.plugin.NodeUnstageVolume(ctx, vol.ID, stagingPath, @@ -274,6 +276,8 @@ func combineErrors(maybeErrs ...error) error { func (v *volumeManager) unpublishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) error { pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, vol, alloc, usage) + // CSI NodeUnpublishVolume errors for timeout, codes.Unavailable and + // codes.ResourceExhausted are retried; all other errors are fatal. rpcErr := v.plugin.NodeUnpublishVolume(ctx, vol.ID, pluginTargetPath, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), grpc_retry.WithMax(3), diff --git a/plugins/csi/client.go b/plugins/csi/client.go index d558b0331..d0e3e40f3 100644 --- a/plugins/csi/client.go +++ b/plugins/csi/client.go @@ -129,7 +129,11 @@ func newGrpcConn(addr string, logger hclog.Logger) (*grpc.ClientConn, error) { // PluginInfo describes the type and version of a plugin as required by the nomad // base.BasePlugin interface. func (c *client) PluginInfo() (*base.PluginInfoResponse, error) { - name, version, err := c.PluginGetInfo(context.TODO()) + // note: no grpc retries needed here, as this is called in + // fingerprinting and will get retried by the caller. + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + defer cancel() + name, version, err := c.PluginGetInfo(ctx) if err != nil { return nil, err } @@ -155,6 +159,7 @@ func (c *client) SetConfig(_ *base.Config) error { } func (c *client) PluginProbe(ctx context.Context) (bool, error) { + // note: no grpc retries should be done here req, err := c.identityClient.Probe(ctx, &csipbv1.ProbeRequest{}) if err != nil { return false, err @@ -205,7 +210,10 @@ func (c *client) PluginGetCapabilities(ctx context.Context) (*PluginCapabilitySe return nil, fmt.Errorf("Client not initialized") } - resp, err := c.identityClient.GetPluginCapabilities(ctx, &csipbv1.GetPluginCapabilitiesRequest{}) + // note: no grpc retries needed here, as this is called in + // fingerprinting and will get retried by the caller + resp, err := c.identityClient.GetPluginCapabilities(ctx, + &csipbv1.GetPluginCapabilitiesRequest{}) if err != nil { return nil, err } @@ -225,7 +233,10 @@ func (c *client) ControllerGetCapabilities(ctx context.Context) (*ControllerCapa return nil, fmt.Errorf("controllerClient not initialized") } - resp, err := c.controllerClient.ControllerGetCapabilities(ctx, &csipbv1.ControllerGetCapabilitiesRequest{}) + // note: no grpc retries needed here, as this is called in + // fingerprinting and will get retried by the caller + resp, err := c.controllerClient.ControllerGetCapabilities(ctx, + &csipbv1.ControllerGetCapabilitiesRequest{}) if err != nil { return nil, err } @@ -233,7 +244,7 @@ func (c *client) ControllerGetCapabilities(ctx context.Context) (*ControllerCapa return NewControllerCapabilitySet(resp), nil } -func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest) (*ControllerPublishVolumeResponse, error) { +func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*ControllerPublishVolumeResponse, error) { if c == nil { return nil, fmt.Errorf("Client not initialized") } @@ -247,7 +258,7 @@ func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPub } pbrequest := req.ToCSIRepresentation() - resp, err := c.controllerClient.ControllerPublishVolume(ctx, pbrequest) + resp, err := c.controllerClient.ControllerPublishVolume(ctx, pbrequest, opts...) if err != nil { return nil, err } @@ -257,7 +268,7 @@ func (c *client) ControllerPublishVolume(ctx context.Context, req *ControllerPub }, nil } -func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error) { +func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*ControllerUnpublishVolumeResponse, error) { if c == nil { return nil, fmt.Errorf("Client not initialized") } @@ -270,7 +281,7 @@ func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerU } upbrequest := req.ToCSIRepresentation() - _, err = c.controllerClient.ControllerUnpublishVolume(ctx, upbrequest) + _, err = c.controllerClient.ControllerUnpublishVolume(ctx, upbrequest, opts...) if err != nil { return nil, err } @@ -278,7 +289,7 @@ func (c *client) ControllerUnpublishVolume(ctx context.Context, req *ControllerU return &ControllerUnpublishVolumeResponse{}, nil } -func (c *client) ControllerValidateCapabilties(ctx context.Context, volumeID string, capabilities *VolumeCapability) error { +func (c *client) ControllerValidateCapabilities(ctx context.Context, volumeID string, capabilities *VolumeCapability, opts ...grpc.CallOption) error { if c == nil { return fmt.Errorf("Client not initialized") } @@ -301,7 +312,7 @@ func (c *client) ControllerValidateCapabilties(ctx context.Context, volumeID str }, } - resp, err := c.controllerClient.ValidateVolumeCapabilities(ctx, req) + resp, err := c.controllerClient.ValidateVolumeCapabilities(ctx, req, opts...) if err != nil { return err } @@ -329,6 +340,8 @@ func (c *client) NodeGetCapabilities(ctx context.Context) (*NodeCapabilitySet, e return nil, fmt.Errorf("Client not initialized") } + // note: no grpc retries needed here, as this is called in + // fingerprinting and will get retried by the caller resp, err := c.nodeClient.NodeGetCapabilities(ctx, &csipbv1.NodeGetCapabilitiesRequest{}) if err != nil { return nil, err @@ -347,6 +360,8 @@ func (c *client) NodeGetInfo(ctx context.Context) (*NodeGetInfoResponse, error) result := &NodeGetInfoResponse{} + // note: no grpc retries needed here, as this is called in + // fingerprinting and will get retried by the caller resp, err := c.nodeClient.NodeGetInfo(ctx, &csipbv1.NodeGetInfoRequest{}) if err != nil { return nil, err diff --git a/plugins/csi/fake/client.go b/plugins/csi/fake/client.go index b971ce260..963cad65f 100644 --- a/plugins/csi/fake/client.go +++ b/plugins/csi/fake/client.go @@ -140,7 +140,7 @@ func (c *Client) ControllerGetCapabilities(ctx context.Context) (*csi.Controller } // ControllerPublishVolume is used to attach a remote volume to a node -func (c *Client) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { +func (c *Client) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*csi.ControllerPublishVolumeResponse, error) { c.Mu.Lock() defer c.Mu.Unlock() @@ -150,7 +150,7 @@ func (c *Client) ControllerPublishVolume(ctx context.Context, req *csi.Controlle } // ControllerUnpublishVolume is used to attach a remote volume to a node -func (c *Client) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { +func (c *Client) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*csi.ControllerUnpublishVolumeResponse, error) { c.Mu.Lock() defer c.Mu.Unlock() @@ -159,7 +159,7 @@ func (c *Client) ControllerUnpublishVolume(ctx context.Context, req *csi.Control return c.NextControllerUnpublishVolumeResponse, c.NextControllerUnpublishVolumeErr } -func (c *Client) ControllerValidateCapabilties(ctx context.Context, volumeID string, capabilities *csi.VolumeCapability) error { +func (c *Client) ControllerValidateCapabilities(ctx context.Context, volumeID string, capabilities *csi.VolumeCapability, opts ...grpc.CallOption) error { c.Mu.Lock() defer c.Mu.Unlock() diff --git a/plugins/csi/plugin.go b/plugins/csi/plugin.go index 345b9f753..50bd0bc00 100644 --- a/plugins/csi/plugin.go +++ b/plugins/csi/plugin.go @@ -36,14 +36,14 @@ type CSIPlugin interface { ControllerGetCapabilities(ctx context.Context) (*ControllerCapabilitySet, error) // ControllerPublishVolume is used to attach a remote volume to a cluster node. - ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest) (*ControllerPublishVolumeResponse, error) + ControllerPublishVolume(ctx context.Context, req *ControllerPublishVolumeRequest, opts ...grpc.CallOption) (*ControllerPublishVolumeResponse, error) // ControllerUnpublishVolume is used to deattach a remote volume from a cluster node. - ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest) (*ControllerUnpublishVolumeResponse, error) + ControllerUnpublishVolume(ctx context.Context, req *ControllerUnpublishVolumeRequest, opts ...grpc.CallOption) (*ControllerUnpublishVolumeResponse, error) // ControllerValidateCapabilities is used to validate that a volume exists and // supports the requested capability. - ControllerValidateCapabilties(ctx context.Context, volumeID string, capabilities *VolumeCapability) error + ControllerValidateCapabilities(ctx context.Context, volumeID string, capabilities *VolumeCapability, opts ...grpc.CallOption) error // NodeGetCapabilities is used to return the available capabilities from the // Node Service.