From 01b69ef7bbeef5e5e352dadff58433a7623770fd Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Fri, 14 Feb 2020 13:34:41 +0100 Subject: [PATCH] csi: Unpublish volumes during ar.Postrun This commit introduces initial support for unmounting csi volumes. It takes a relatively simplistic approach to performing NodeUnpublishVolume calls, optimising for cleaning up any leftover state rather than terminating early in the case of errors. This is because it happens during an allocation's shutdown flow and may not always have a corresponding call to `NodePublishVolume` that succeeded. --- client/allocrunner/csi_hook.go | 36 ++++++++++++ client/pluginmanager/csimanager/volume.go | 49 +++++++++++++++- .../pluginmanager/csimanager/volume_test.go | 56 +++++++++++++++++++ 3 files changed, 140 insertions(+), 1 deletion(-) diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index aa281d2cc..d618eb85a 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -5,6 +5,7 @@ import ( "fmt" hclog "github.com/hashicorp/go-hclog" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/nomad/structs" ) @@ -58,6 +59,41 @@ func (c *csiHook) Prerun() error { return nil } +func (c *csiHook) Postrun() error { + if !c.shouldRun() { + return nil + } + + ctx := context.TODO() + volumes, err := c.csiVolumesFromAlloc() + if err != nil { + return err + } + + // For Postrun, we accumulate all unmount errors, rather than stopping on the + // first failure. This is because we want to make a best effort to free all + // storage, and in some cases there may be incorrect errors from volumes that + // never mounted correctly during prerun when an alloc is failed. It may also + // fail because a volume was externally deleted while in use by this alloc. + var result *multierror.Error + + for _, volume := range volumes { + mounter, err := c.csimanager.MounterForVolume(ctx, volume) + if err != nil { + result = multierror.Append(result, err) + continue + } + + err = mounter.UnmountVolume(ctx, volume, c.alloc) + if err != nil { + result = multierror.Append(result, err) + continue + } + } + + return result.ErrorOrNil() +} + // csiVolumesFromAlloc finds all the CSI Volume requests from the allocation's // task group and then fetches them from the Nomad Server, before returning // them in the form of map[RequestedAlias]*structs.CSIVolume. diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index 2650e2fa0..1d61112c4 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -9,6 +9,7 @@ import ( grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper/mount" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/csi" @@ -273,11 +274,57 @@ func (v *volumeManager) unstageVolume(ctx context.Context, vol *structs.CSIVolum ) } +func combineErrors(maybeErrs ...error) error { + var result *multierror.Error + for _, err := range maybeErrs { + if err == nil { + continue + } + + result = multierror.Append(result, err) + } + + return result.ErrorOrNil() +} + +func (v *volumeManager) unpublishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation) error { + pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, vol, alloc) + + rpcErr := v.plugin.NodeUnpublishVolume(ctx, vol.ID, pluginTargetPath, + grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), + grpc_retry.WithMax(3), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)), + ) + + hostTargetPath := v.allocDirForVolume(v.mountRoot, vol, alloc) + if _, err := os.Stat(hostTargetPath); os.IsNotExist(err) { + // Host Target Path already got destroyed, just return any rpcErr + return rpcErr + } + + // Host Target Path was not cleaned up, attempt to do so here. If it's still + // a mount then removing the dir will fail and we'll return any rpcErr and the + // file error. + rmErr := os.Remove(hostTargetPath) + if rmErr != nil { + return combineErrors(rpcErr, rmErr) + } + + // We successfully removed the directory, return any rpcErrors that were + // encountered, but because we got here, they were probably flaky or was + // cleaned up externally. We might want to just return `nil` here in the + // future. + return rpcErr +} + func (v *volumeManager) UnmountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation) error { logger := v.logger.With("volume_id", vol.ID, "alloc_id", alloc.ID) ctx = hclog.WithContext(ctx, logger) - // TODO(GH-7030): NodeUnpublishVolume + err := v.unpublishVolume(ctx, vol, alloc) + if err != nil { + return err + } if !v.requiresStaging { return nil diff --git a/client/pluginmanager/csimanager/volume_test.go b/client/pluginmanager/csimanager/volume_test.go index b5b74c142..5b4a9bafe 100644 --- a/client/pluginmanager/csimanager/volume_test.go +++ b/client/pluginmanager/csimanager/volume_test.go @@ -290,3 +290,59 @@ func TestVolumeManager_publishVolume(t *testing.T) { }) } } + +func TestVolumeManager_unpublishVolume(t *testing.T) { + t.Parallel() + cases := []struct { + Name string + Allocation *structs.Allocation + Volume *structs.CSIVolume + PluginErr error + ExpectedErr error + ExpectedCSICallCount int64 + }{ + { + Name: "Returns an error when the plugin returns an error", + Allocation: structs.MockAlloc(), + Volume: &structs.CSIVolume{ + ID: "foo", + }, + PluginErr: errors.New("Some Unknown Error"), + ExpectedErr: errors.New("Some Unknown Error"), + ExpectedCSICallCount: 1, + }, + { + Name: "Happy Path", + Allocation: structs.MockAlloc(), + Volume: &structs.CSIVolume{ + ID: "foo", + }, + PluginErr: nil, + ExpectedErr: nil, + ExpectedCSICallCount: 1, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + tmpPath := tmpDir(t) + defer os.RemoveAll(tmpPath) + + csiFake := &csifake.Client{} + csiFake.NextNodeUnpublishVolumeErr = tc.PluginErr + + manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) + ctx := context.Background() + + err := manager.unpublishVolume(ctx, tc.Volume, tc.Allocation) + + if tc.ExpectedErr != nil { + require.EqualError(t, err, tc.ExpectedErr.Error()) + } else { + require.NoError(t, err) + } + + require.Equal(t, tc.ExpectedCSICallCount, csiFake.NodeUnpublishVolumeCallCount) + }) + } +}