From bead8dc8fcdfead7c0de53cc2ef6ad229818739f Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Fri, 31 Jan 2020 14:45:48 +0100 Subject: [PATCH] volume_manager: Add support for publishing volumes --- client/pluginmanager/csimanager/interface.go | 2 + client/pluginmanager/csimanager/volume.go | 125 ++++++++++++++---- .../pluginmanager/csimanager/volume_test.go | 62 ++++++++- 3 files changed, 162 insertions(+), 27 deletions(-) diff --git a/client/pluginmanager/csimanager/interface.go b/client/pluginmanager/csimanager/interface.go index c266e2842..b969ff13e 100644 --- a/client/pluginmanager/csimanager/interface.go +++ b/client/pluginmanager/csimanager/interface.go @@ -13,6 +13,8 @@ var ( ) type MountInfo struct { + Source string + IsDevice bool } type VolumeMounter interface { diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index 2701e9159..b7838a369 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -59,6 +59,10 @@ func (v *volumeManager) stagingDirForVolume(vol *structs.CSIVolume) string { return filepath.Join(v.mountRoot, StagingDirName, vol.ID, "todo-provide-usage-options") } +func (v *volumeManager) allocDirForVolume(vol *structs.CSIVolume, alloc *structs.Allocation) string { + return filepath.Join(v.mountRoot, AllocSpecificDirName, alloc.ID, vol.ID, "todo-provide-usage-options") +} + // ensureStagingDir attempts to create a directory for use when staging a volume // and then validates that the path is not already a mount point for e.g an // existing volume stage. @@ -83,23 +87,31 @@ func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume) (bool, string, return !isNotMount, stagingPath, nil } -// stageVolume prepares a volume for use by allocations. When a plugin exposes -// the STAGE_UNSTAGE_VOLUME capability it MUST be called once-per-volume for a -// given usage mode before the volume can be NodePublish-ed. -func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) error { - logger := hclog.FromContext(ctx) - logger.Trace("Preparing volume staging environment") - existingMount, stagingPath, err := v.ensureStagingDir(vol) +// ensureAllocDir attempts to create a directory for use when publishing a volume +// and then validates that the path is not already a mount point (e.g when reattaching +// to existing allocs). +// +// Returns whether the directory is a pre-existing mountpoint, the publish path, +// and any errors that occurred. +func (v *volumeManager) ensureAllocDir(vol *structs.CSIVolume, alloc *structs.Allocation) (bool, string, error) { + allocPath := v.allocDirForVolume(vol, alloc) + + // Make the alloc path, owned by the Nomad User + if err := os.MkdirAll(allocPath, 0700); err != nil && !os.IsExist(err) { + return false, "", fmt.Errorf("failed to create allocation directory for volume (%s): %v", vol.ID, err) + } + + // Validate that it is not already a mount point + m := mount.New() + isNotMount, err := m.IsNotAMountPoint(allocPath) if err != nil { - return err - } - logger.Trace("Volume staging environment", "pre-existing_mount", existingMount, "staging_path", stagingPath) - - if existingMount { - logger.Debug("re-using existing staging mount for volume", "staging_path", stagingPath) - return nil + return false, "", fmt.Errorf("mount point detection failed for volume (%s): %v", vol.ID, err) } + return !isNotMount, allocPath, nil +} + +func capabilitiesFromVolume(vol *structs.CSIVolume) (*csi.VolumeCapability, error) { var accessType csi.VolumeAccessType switch vol.AttachmentMode { case structs.CSIVolumeAttachmentModeBlockDevice: @@ -111,7 +123,7 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) // final check during transformation into the requisite CSI Data type to // defend against development bugs and corrupted state - and incompatible // nomad versions in the future. - return fmt.Errorf("Unknown volume attachment mode: %s", vol.AttachmentMode) + return nil, fmt.Errorf("Unknown volume attachment mode: %s", vol.AttachmentMode) } var accessMode csi.VolumeAccessMode @@ -131,7 +143,38 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) // final check during transformation into the requisite CSI Data type to // defend against development bugs and corrupted state - and incompatible // nomad versions in the future. - return fmt.Errorf("Unknown volume access mode: %v", vol.AccessMode) + return nil, fmt.Errorf("Unknown volume access mode: %v", vol.AccessMode) + } + + return &csi.VolumeCapability{ + AccessType: accessType, + AccessMode: accessMode, + VolumeMountOptions: &csi.VolumeMountOptions{ + // GH-7007: Currently we have no way to provide these + }, + }, nil +} + +// stageVolume prepares a volume for use by allocations. When a plugin exposes +// the STAGE_UNSTAGE_VOLUME capability it MUST be called once-per-volume for a +// given usage mode before the volume can be NodePublish-ed. +func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) (string, error) { + logger := hclog.FromContext(ctx) + logger.Trace("Preparing volume staging environment") + existingMount, stagingPath, err := v.ensureStagingDir(vol) + if err != nil { + return "", err + } + logger.Trace("Volume staging environment", "pre-existing_mount", existingMount, "staging_path", stagingPath) + + if existingMount { + logger.Debug("re-using existing staging mount for volume", "staging_path", stagingPath) + return stagingPath, nil + } + + capability, err := capabilitiesFromVolume(vol) + if err != nil { + return "", err } // We currently treat all explicit CSI NodeStageVolume errors (aside from timeouts, codes.ResourceExhausted, and codes.Unavailable) @@ -139,23 +182,50 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) // In the future, we can provide more useful error messages based on // different types of error. For error documentation see: // https://github.com/container-storage-interface/spec/blob/4731db0e0bc53238b93850f43ab05d9355df0fd9/spec.md#nodestagevolume-errors - return v.plugin.NodeStageVolume(ctx, + return stagingPath, v.plugin.NodeStageVolume(ctx, vol.ID, nil, /* TODO: Get publishContext from Server */ stagingPath, - &csi.VolumeCapability{ - AccessType: accessType, - AccessMode: accessMode, - VolumeMountOptions: &csi.VolumeMountOptions{ - // GH-7007: Currently we have no way to provide these - }, - }, + capability, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), grpc_retry.WithMax(3), grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)), ) } +func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, stagingPath string) (*MountInfo, error) { + logger := hclog.FromContext(ctx) + + preexistingMountForAlloc, targetPath, err := v.ensureAllocDir(vol, alloc) + if err != nil { + return nil, err + } + + if preexistingMountForAlloc { + logger.Debug("Re-using existing published volume for allocation") + return &MountInfo{Source: targetPath}, nil + } + + capabilities, err := capabilitiesFromVolume(vol) + if err != nil { + return nil, err + } + + err = v.plugin.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{ + VolumeID: vol.ID, + PublishContext: nil, // TODO: get publishcontext from server + StagingTargetPath: stagingPath, + TargetPath: targetPath, + VolumeCapability: capabilities, + }, + grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), + grpc_retry.WithMax(3), + grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)), + ) + + return &MountInfo{Source: targetPath}, err +} + // MountVolume performs the steps required for using a given volume // configuration for the provided allocation. // @@ -164,14 +234,17 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, logger := v.logger.With("volume_id", vol.ID, "alloc_id", alloc.ID) ctx = hclog.WithContext(ctx, logger) + var stagingPath string + var err error + if v.requiresStaging { - err := v.stageVolume(ctx, vol) + stagingPath, err = v.stageVolume(ctx, vol) if err != nil { return nil, err } } - return nil, fmt.Errorf("Unimplemented") + return v.publishVolume(ctx, vol, alloc, stagingPath) } // unstageVolume is the inverse operation of `stageVolume` and must be called diff --git a/client/pluginmanager/csimanager/volume_test.go b/client/pluginmanager/csimanager/volume_test.go index 687bfbc25..0decc484e 100644 --- a/client/pluginmanager/csimanager/volume_test.go +++ b/client/pluginmanager/csimanager/volume_test.go @@ -167,7 +167,7 @@ func TestVolumeManager_stageVolume(t *testing.T) { manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, true) ctx := context.Background() - err := manager.stageVolume(ctx, tc.Volume) + _, err := manager.stageVolume(ctx, tc.Volume) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error()) @@ -230,3 +230,63 @@ func TestVolumeManager_unstageVolume(t *testing.T) { }) } } + +func TestVolumeManager_publishVolume(t *testing.T) { + t.Parallel() + cases := []struct { + Name string + Allocation *structs.Allocation + Volume *structs.CSIVolume + PluginErr error + ExpectedErr error + ExpectedCSICallCount int64 + }{ + { + Name: "Returns an error when the plugin returns an error", + Allocation: structs.MockAlloc(), + Volume: &structs.CSIVolume{ + ID: "foo", + AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice, + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + }, + PluginErr: errors.New("Some Unknown Error"), + ExpectedErr: errors.New("Some Unknown Error"), + ExpectedCSICallCount: 1, + }, + { + Name: "Happy Path", + Allocation: structs.MockAlloc(), + Volume: &structs.CSIVolume{ + ID: "foo", + AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice, + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + }, + PluginErr: nil, + ExpectedErr: nil, + ExpectedCSICallCount: 1, + }, + } + + for _, tc := range cases { + t.Run(tc.Name, func(t *testing.T) { + tmpPath := tmpDir(t) + defer os.RemoveAll(tmpPath) + + csiFake := &csifake.Client{} + csiFake.NextNodePublishVolumeErr = tc.PluginErr + + manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, true) + ctx := context.Background() + + _, err := manager.publishVolume(ctx, tc.Volume, tc.Allocation, "") + + if tc.ExpectedErr != nil { + require.EqualError(t, err, tc.ExpectedErr.Error()) + } else { + require.NoError(t, err) + } + + require.Equal(t, tc.ExpectedCSICallCount, csiFake.NodePublishVolumeCallCount) + }) + } +}