diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index aa281d2cc..d618eb85a 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -5,6 +5,7 @@ import ( "fmt" hclog "github.com/hashicorp/go-hclog" + multierror "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/client/pluginmanager/csimanager" "github.com/hashicorp/nomad/nomad/structs" ) @@ -58,6 +59,41 @@ func (c *csiHook) Prerun() error { return nil } +func (c *csiHook) Postrun() error { + if !c.shouldRun() { + return nil + } + + ctx := context.TODO() + volumes, err := c.csiVolumesFromAlloc() + if err != nil { + return err + } + + // For Postrun, we accumulate all unmount errors, rather than stopping on the + // first failure. This is because we want to make a best effort to free all + // storage, and in some cases there may be incorrect errors from volumes that + // never mounted correctly during prerun when an alloc is failed. It may also + // fail because a volume was externally deleted while in use by this alloc. + var result *multierror.Error + + for _, volume := range volumes { + mounter, err := c.csimanager.MounterForVolume(ctx, volume) + if err != nil { + result = multierror.Append(result, err) + continue + } + + err = mounter.UnmountVolume(ctx, volume, c.alloc) + if err != nil { + result = multierror.Append(result, err) + continue + } + } + + return result.ErrorOrNil() +} + // csiVolumesFromAlloc finds all the CSI Volume requests from the allocation's // task group and then fetches them from the Nomad Server, before returning // them in the form of map[RequestedAlias]*structs.CSIVolume. diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index 2650e2fa0..1d61112c4 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -9,6 +9,7 @@ import ( grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-multierror" "github.com/hashicorp/nomad/helper/mount" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/plugins/csi" @@ -273,11 +274,57 @@ func (v *volumeManager) unstageVolume(ctx context.Context, vol *structs.CSIVolum ) } +func combineErrors(maybeErrs ...error) error { + var result *multierror.Error + for _, err := range maybeErrs { + if err == nil { + continue + } + + result = multierror.Append(result, err) + } + + return result.ErrorOrNil() +} + +func (v *volumeManager) unpublishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation) error { + pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, vol, alloc) + + rpcErr := v.plugin.NodeUnpublishVolume(ctx, vol.ID, pluginTargetPath, + grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), + grpc_retry.WithMax(3), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)), + ) + + hostTargetPath := v.allocDirForVolume(v.mountRoot, vol, alloc) + if _, err := os.Stat(hostTargetPath); os.IsNotExist(err) { + // Host Target Path already got destroyed, just return any rpcErr + return rpcErr + } + + // Host Target Path was not cleaned up, attempt to do so here. If it's still + // a mount then removing the dir will fail and we'll return any rpcErr and the + // file error. + rmErr := os.Remove(hostTargetPath) + if rmErr != nil { + return combineErrors(rpcErr, rmErr) + } + + // We successfully removed the directory, return any rpcErrors that were + // encountered, but because we got here, they were probably flaky or was + // cleaned up externally. We might want to just return `nil` here in the + // future. + return rpcErr +} + func (v *volumeManager) UnmountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation) error { logger := v.logger.With("volume_id", vol.ID, "alloc_id", alloc.ID) ctx = hclog.WithContext(ctx, logger) - // TODO(GH-7030): NodeUnpublishVolume + err := v.unpublishVolume(ctx, vol, alloc) + if err != nil { + return err + } if !v.requiresStaging { return nil diff --git a/client/pluginmanager/csimanager/volume_test.go b/client/pluginmanager/csimanager/volume_test.go index b5b74c142..5b4a9bafe 100644 --- a/client/pluginmanager/csimanager/volume_test.go +++ b/client/pluginmanager/csimanager/volume_test.go @@ -290,3 +290,59 @@ func TestVolumeManager_publishVolume(t *testing.T) { }) } } + +func TestVolumeManager_unpublishVolume(t *testing.T) { + t.Parallel() + cases := []struct { + Name string + Allocation *structs.Allocation + Volume *structs.CSIVolume + PluginErr error + ExpectedErr error + ExpectedCSICallCount int64 + }{ + { + Name: "Returns an error when the plugin returns an error", + Allocation: structs.MockAlloc(), + Volume: &structs.CSIVolume{ + ID: "foo", + }, + PluginErr: errors.New("Some Unknown Error"), + ExpectedErr: errors.New("Some Unknown Error"), + ExpectedCSICallCount: 1, + }, + { + Name: "Happy Path", + Allocation: structs.MockAlloc(), + Volume: &structs.CSIVolume{ + ID: "foo", + }, + PluginErr: nil, + ExpectedErr: nil, + ExpectedCSICallCount: 1, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + tmpPath := tmpDir(t) + defer os.RemoveAll(tmpPath) + + csiFake := &csifake.Client{} + csiFake.NextNodeUnpublishVolumeErr = tc.PluginErr + + manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) + ctx := context.Background() + + err := manager.unpublishVolume(ctx, tc.Volume, tc.Allocation) + + if tc.ExpectedErr != nil { + require.EqualError(t, err, tc.ExpectedErr.Error()) + } else { + require.NoError(t, err) + } + + require.Equal(t, tc.ExpectedCSICallCount, csiFake.NodeUnpublishVolumeCallCount) + }) + } +}