CSI: node unmount from the client before unpublish RPC (#11892)

When an allocation stops, the `csi_hook` makes an unpublish RPC to the
servers to unpublish via the CSI RPCs: first to the node plugins and
then the controller plugins. The controller RPCs must happen after the
node RPCs so that the node has had a chance to unmount the volume
before the controller tries to detach the associated device.

But the client has local access to the node plugins and can
independently determine if it's safe to send unpublish RPC to those
plugins. This will allow the server to treat the node plugin as
abandoned if a client is disconnected and `stop_on_client_disconnect`
is set. This will let the server try to send unpublish RPCs to the
controller plugins, under the assumption that the client will be
trying to unmount the volume on its end first.

Note that the CSI `NodeUnpublishVolume`/`NodeUnstageVolume` RPCs can 
return ignorable errors in the case where the volume has already been
unmounted from the node. Handle all other errors by retrying until we
get success so as to give operators the opportunity to reschedule a
failed node plugin (ex. in the case where they accidentally drained a
node without `-ignore-system`). Fan-out the work for each volume into
its own goroutine so that we can release a subset of volumes if only
one is stuck.
This commit is contained in:
Tim Gross
2022-01-28 08:30:31 -05:00
committed by GitHub
parent f2fef6ff07
commit 8364eda1d7
3 changed files with 143 additions and 34 deletions

View File

@@ -3,6 +3,8 @@ package allocrunner
import (
"context"
"fmt"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
@@ -24,7 +26,9 @@ type csiHook struct {
updater hookResourceSetter
nodeSecret string
volumeRequests map[string]*volumeAndRequest
volumeRequests map[string]*volumeAndRequest
maxBackoffInterval time.Duration
maxBackoffDuration time.Duration
}
// implemented by allocrunner
@@ -42,6 +46,8 @@ func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.M
updater: updater,
nodeSecret: nodeSecret,
volumeRequests: map[string]*volumeAndRequest{},
maxBackoffInterval: time.Minute,
maxBackoffDuration: time.Hour * 24,
}
}
@@ -103,41 +109,43 @@ func (c *csiHook) Postrun() error {
return nil
}
var mErr *multierror.Error
var wg sync.WaitGroup
errs := make(chan error, len(c.volumeRequests))
for _, pair := range c.volumeRequests {
wg.Add(1)
mode := structs.CSIVolumeClaimRead
if !pair.request.ReadOnly {
mode = structs.CSIVolumeClaimWrite
}
// CSI RPCs can potentially fail for a very long time if a
// node plugin has failed. split the work into goroutines so
// that operators could potentially reuse one of a set of
// volumes even if this hook is stuck waiting on the others
go func(pair *volumeAndRequest) {
defer wg.Done()
source := pair.request.Source
if pair.request.PerAlloc {
// NOTE: PerAlloc can't be set if we have canaries
source = source + structs.AllocSuffix(c.alloc.Name)
}
// we can recover an unmount failure if the operator
// brings the plugin back up, so retry every few minutes
// but eventually give up
err := c.unmountWithRetry(pair)
if err != nil {
errs <- err
return
}
req := &structs.CSIVolumeUnpublishRequest{
VolumeID: source,
Claim: &structs.CSIVolumeClaim{
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Mode: mode,
State: structs.CSIVolumeClaimStateUnpublishing,
},
WriteRequest: structs.WriteRequest{
Region: c.alloc.Job.Region,
Namespace: c.alloc.Job.Namespace,
AuthToken: c.nodeSecret,
},
}
err := c.rpcClient.RPC("CSIVolume.Unpublish",
req, &structs.CSIVolumeUnpublishResponse{})
if err != nil {
mErr = multierror.Append(mErr, err)
}
// we can't recover from this RPC error client-side; the
// volume claim GC job will have to clean up for us once
// the allocation is marked terminal
errs <- c.unpublish(pair)
}(pair)
}
wg.Wait()
close(errs) // so we don't block waiting if there were no errors
var mErr *multierror.Error
for err := range errs {
mErr = multierror.Append(mErr, err)
}
return mErr.ErrorOrNil()
}
@@ -231,3 +239,95 @@ func (c *csiHook) shouldRun() bool {
return false
}
func (c *csiHook) unpublish(pair *volumeAndRequest) error {
mode := structs.CSIVolumeClaimRead
if !pair.request.ReadOnly {
mode = structs.CSIVolumeClaimWrite
}
source := pair.request.Source
if pair.request.PerAlloc {
// NOTE: PerAlloc can't be set if we have canaries
source = source + structs.AllocSuffix(c.alloc.Name)
}
req := &structs.CSIVolumeUnpublishRequest{
VolumeID: source,
Claim: &structs.CSIVolumeClaim{
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Mode: mode,
State: structs.CSIVolumeClaimStateUnpublishing,
},
WriteRequest: structs.WriteRequest{
Region: c.alloc.Job.Region,
Namespace: c.alloc.Job.Namespace,
AuthToken: c.nodeSecret,
},
}
return c.rpcClient.RPC("CSIVolume.Unpublish",
req, &structs.CSIVolumeUnpublishResponse{})
}
// unmountWithRetry tries to unmount/unstage the volume, retrying with
// exponential backoff capped to a maximum interval
func (c *csiHook) unmountWithRetry(pair *volumeAndRequest) error {
// note: allocrunner hooks don't have access to the client's
// shutdown context, just the allocrunner's shutdown; if we make
// it available in the future we should thread it through here so
// that retry can exit gracefully instead of dropping the
// in-flight goroutine
ctx, cancel := context.WithTimeout(context.TODO(), c.maxBackoffDuration)
defer cancel()
var err error
backoff := time.Second
ticker := time.NewTicker(backoff)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return err
case <-ticker.C:
}
err = c.unmountImpl(pair)
if err == nil {
break
}
if backoff < c.maxBackoffInterval {
backoff = backoff * 2
if backoff > c.maxBackoffInterval {
backoff = c.maxBackoffInterval
}
}
ticker.Reset(backoff)
}
return nil
}
// unmountImpl implements the call to the CSI plugin manager to
// unmount the volume. Each retry will write an "Unmount volume"
// NodeEvent
func (c *csiHook) unmountImpl(pair *volumeAndRequest) error {
mounter, err := c.csimanager.MounterForPlugin(context.TODO(), pair.volume.PluginID)
if err != nil {
return err
}
usageOpts := &csimanager.UsageOptions{
ReadOnly: pair.request.ReadOnly,
AttachmentMode: pair.request.AttachmentMode,
AccessMode: pair.request.AccessMode,
MountOptions: pair.request.MountOptions,
}
return mounter.UnmountVolume(context.TODO(),
pair.volume.ID, pair.volume.RemoteID(), c.alloc.ID, usageOpts)
}

View File

@@ -5,6 +5,7 @@ import (
"fmt"
"path/filepath"
"testing"
"time"
"github.com/stretchr/testify/require"
@@ -59,7 +60,7 @@ func TestCSIHook(t *testing.T) {
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 1,
expectedUnmountCalls: 0, // not until this is done client-side
expectedUnmountCalls: 1,
expectedClaimCalls: 1,
expectedUnpublishCalls: 1,
},
@@ -83,7 +84,7 @@ func TestCSIHook(t *testing.T) {
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMountCalls: 1,
expectedUnmountCalls: 0, // not until this is done client-side
expectedUnmountCalls: 1,
expectedClaimCalls: 1,
expectedUnpublishCalls: 1,
},
@@ -122,7 +123,7 @@ func TestCSIHook(t *testing.T) {
// "test-alloc-dir/%s/testvolume0/ro-file-system-multi-node-reader-only", alloc.ID)},
// },
// expectedMountCalls: 1,
// expectedUnmountCalls: 0, // not until this is done client-side
// expectedUnmountCalls: 1,
// expectedClaimCalls: 1,
// expectedUnpublishCalls: 1,
// },
@@ -144,6 +145,9 @@ func TestCSIHook(t *testing.T) {
},
}
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar, "secret")
hook.maxBackoffInterval = 100 * time.Millisecond
hook.maxBackoffDuration = 2 * time.Second
require.NotNil(t, hook)
require.NoError(t, hook.Prerun())

View File

@@ -353,11 +353,16 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo
}
}
if errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
logger.Trace("unmounting volume failed with ignorable error", "error", err)
err = nil
}
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemStorage).
SetMessage("Unmount volume").
AddDetail("volume_id", volID)
if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
if err == nil {
event.AddDetail("success", "true")
} else {
event.AddDetail("success", "false")