diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index d618eb85a..409c4822a 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -38,13 +38,19 @@ func (c *csiHook) Prerun() error { } mounts := make(map[string]*csimanager.MountInfo, len(volumes)) - for alias, volume := range volumes { - mounter, err := c.csimanager.MounterForVolume(ctx, volume) + for alias, pair := range volumes { + mounter, err := c.csimanager.MounterForVolume(ctx, pair.volume) if err != nil { return err } - mountInfo, err := mounter.MountVolume(ctx, volume, c.alloc) + usageOpts := &csimanager.UsageOptions{ + ReadOnly: pair.request.ReadOnly, + AttachmentMode: string(pair.volume.AttachmentMode), + AccessMode: string(pair.volume.AccessMode), + } + + mountInfo, err := mounter.MountVolume(ctx, pair.volume, c.alloc, usageOpts) if err != nil { return err } @@ -77,14 +83,20 @@ func (c *csiHook) Postrun() error { // fail because a volume was externally deleted while in use by this alloc. var result *multierror.Error - for _, volume := range volumes { - mounter, err := c.csimanager.MounterForVolume(ctx, volume) + for _, pair := range volumes { + mounter, err := c.csimanager.MounterForVolume(ctx, pair.volume) if err != nil { result = multierror.Append(result, err) continue } - err = mounter.UnmountVolume(ctx, volume, c.alloc) + usageOpts := &csimanager.UsageOptions{ + ReadOnly: pair.request.ReadOnly, + AttachmentMode: string(pair.volume.AttachmentMode), + AccessMode: string(pair.volume.AccessMode), + } + + err = mounter.UnmountVolume(ctx, pair.volume, c.alloc, usageOpts) if err != nil { result = multierror.Append(result, err) continue @@ -94,24 +106,28 @@ func (c *csiHook) Postrun() error { return result.ErrorOrNil() } +type volumeAndRequest struct { + volume *structs.CSIVolume + request *structs.VolumeRequest +} + // csiVolumesFromAlloc finds all the CSI Volume requests from the allocation's // task group and then fetches them from the Nomad Server, before returning // them in the form of map[RequestedAlias]*structs.CSIVolume. // // If any volume fails to validate then we return an error. -func (c *csiHook) csiVolumesFromAlloc() (map[string]*structs.CSIVolume, error) { - vols := make(map[string]*structs.VolumeRequest) +func (c *csiHook) csiVolumesFromAlloc() (map[string]*volumeAndRequest, error) { + vols := make(map[string]*volumeAndRequest) tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup) for alias, vol := range tg.Volumes { if vol.Type == structs.VolumeTypeCSI { - vols[alias] = vol + vols[alias] = &volumeAndRequest{request: vol} } } - csiVols := make(map[string]*structs.CSIVolume, len(vols)) - for alias, request := range vols { + for alias, pair := range vols { req := &structs.CSIVolumeGetRequest{ - ID: request.Source, + ID: pair.request.Source, } req.Region = c.alloc.Job.Region @@ -121,13 +137,13 @@ func (c *csiHook) csiVolumesFromAlloc() (map[string]*structs.CSIVolume, error) { } if resp.Volume == nil { - return nil, fmt.Errorf("Unexpected nil volume returned for ID: %v", request.Source) + return nil, fmt.Errorf("Unexpected nil volume returned for ID: %v", pair.request.Source) } - csiVols[alias] = resp.Volume + vols[alias].volume = resp.Volume } - return csiVols, nil + return vols, nil } func newCSIHook(logger hclog.Logger, alloc *structs.Allocation, rpcClient RPCer, csi csimanager.Manager, updater hookResourceSetter) *csiHook { diff --git a/client/pluginmanager/csimanager/interface.go b/client/pluginmanager/csimanager/interface.go index b969ff13e..413f8ff65 100644 --- a/client/pluginmanager/csimanager/interface.go +++ b/client/pluginmanager/csimanager/interface.go @@ -3,6 +3,7 @@ package csimanager import ( "context" "errors" + "strings" "github.com/hashicorp/nomad/client/pluginmanager" "github.com/hashicorp/nomad/nomad/structs" @@ -17,9 +18,31 @@ type MountInfo struct { IsDevice bool } +type UsageOptions struct { + ReadOnly bool + AttachmentMode string + AccessMode string +} + +func (u *UsageOptions) ToFS() string { + var sb strings.Builder + + if u.ReadOnly { + sb.WriteString("ro-") + } else { + sb.WriteString("rw-") + } + + sb.WriteString(u.AttachmentMode) + sb.WriteString("-") + sb.WriteString(u.AccessMode) + + return sb.String() +} + type VolumeMounter interface { - MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation) (*MountInfo, error) - UnmountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation) error + MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions) (*MountInfo, error) + UnmountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions) error } type Manager interface { diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index 1d61112c4..3d02903d9 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -61,12 +61,12 @@ func newVolumeManager(logger hclog.Logger, plugin csi.CSIPlugin, rootDir, contai } } -func (v *volumeManager) stagingDirForVolume(root string, vol *structs.CSIVolume) string { - return filepath.Join(root, StagingDirName, vol.ID, "todo-provide-usage-options") +func (v *volumeManager) stagingDirForVolume(root string, vol *structs.CSIVolume, usage *UsageOptions) string { + return filepath.Join(root, StagingDirName, vol.ID, usage.ToFS()) } -func (v *volumeManager) allocDirForVolume(root string, vol *structs.CSIVolume, alloc *structs.Allocation) string { - return filepath.Join(root, AllocSpecificDirName, alloc.ID, vol.ID, "todo-provide-usage-options") +func (v *volumeManager) allocDirForVolume(root string, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) string { + return filepath.Join(root, AllocSpecificDirName, alloc.ID, vol.ID, usage.ToFS()) } // ensureStagingDir attempts to create a directory for use when staging a volume @@ -75,8 +75,8 @@ func (v *volumeManager) allocDirForVolume(root string, vol *structs.CSIVolume, a // // Returns whether the directory is a pre-existing mountpoint, the staging path, // and any errors that occurred. -func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume) (string, bool, error) { - stagingPath := v.stagingDirForVolume(v.mountRoot, vol) +func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume, usage *UsageOptions) (string, bool, error) { + stagingPath := v.stagingDirForVolume(v.mountRoot, vol, usage) // Make the staging path, owned by the Nomad User if err := os.MkdirAll(stagingPath, 0700); err != nil && !os.IsExist(err) { @@ -100,8 +100,8 @@ func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume) (string, bool, // // Returns whether the directory is a pre-existing mountpoint, the publish path, // and any errors that occurred. -func (v *volumeManager) ensureAllocDir(vol *structs.CSIVolume, alloc *structs.Allocation) (string, bool, error) { - allocPath := v.allocDirForVolume(v.mountRoot, vol, alloc) +func (v *volumeManager) ensureAllocDir(vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) (string, bool, error) { + allocPath := v.allocDirForVolume(v.mountRoot, vol, alloc, usage) // Make the alloc path, owned by the Nomad User if err := os.MkdirAll(allocPath, 0700); err != nil && !os.IsExist(err) { @@ -165,14 +165,14 @@ func capabilitiesFromVolume(vol *structs.CSIVolume) (*csi.VolumeCapability, erro // stageVolume prepares a volume for use by allocations. When a plugin exposes // the STAGE_UNSTAGE_VOLUME capability it MUST be called once-per-volume for a // given usage mode before the volume can be NodePublish-ed. -func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) error { +func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume, usage *UsageOptions) error { logger := hclog.FromContext(ctx) logger.Trace("Preparing volume staging environment") - hostStagingPath, isMount, err := v.ensureStagingDir(vol) + hostStagingPath, isMount, err := v.ensureStagingDir(vol, usage) if err != nil { return err } - pluginStagingPath := v.stagingDirForVolume(v.containerMountPoint, vol) + pluginStagingPath := v.stagingDirForVolume(v.containerMountPoint, vol, usage) logger.Trace("Volume staging environment", "pre-existing_mount", isMount, "host_staging_path", hostStagingPath, "plugin_staging_path", pluginStagingPath) @@ -202,18 +202,18 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume) ) } -func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation) (*MountInfo, error) { +func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) (*MountInfo, error) { logger := hclog.FromContext(ctx) var pluginStagingPath string if v.requiresStaging { - pluginStagingPath = v.stagingDirForVolume(v.containerMountPoint, vol) + pluginStagingPath = v.stagingDirForVolume(v.containerMountPoint, vol, usage) } - hostTargetPath, isMount, err := v.ensureAllocDir(vol, alloc) + hostTargetPath, isMount, err := v.ensureAllocDir(vol, alloc, usage) if err != nil { return nil, err } - pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, vol, alloc) + pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, vol, alloc, usage) if isMount { logger.Debug("Re-using existing published volume for allocation") @@ -231,6 +231,7 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum StagingTargetPath: pluginStagingPath, TargetPath: pluginTargetPath, VolumeCapability: capabilities, + Readonly: usage.ReadOnly, }, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), grpc_retry.WithMax(3), @@ -244,27 +245,27 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum // configuration for the provided allocation. // // TODO: Validate remote volume attachment and implement. -func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation) (*MountInfo, error) { +func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) (*MountInfo, error) { logger := v.logger.With("volume_id", vol.ID, "alloc_id", alloc.ID) ctx = hclog.WithContext(ctx, logger) if v.requiresStaging { - if err := v.stageVolume(ctx, vol); err != nil { + if err := v.stageVolume(ctx, vol, usage); err != nil { return nil, err } } - return v.publishVolume(ctx, vol, alloc) + return v.publishVolume(ctx, vol, alloc, usage) } // unstageVolume is the inverse operation of `stageVolume` and must be called // once for each staging path that a volume has been staged under. // It is safe to call multiple times and a plugin is required to return OK if // the volume has been unstaged or was never staged on the node. -func (v *volumeManager) unstageVolume(ctx context.Context, vol *structs.CSIVolume) error { +func (v *volumeManager) unstageVolume(ctx context.Context, vol *structs.CSIVolume, usage *UsageOptions) error { logger := hclog.FromContext(ctx) logger.Trace("Unstaging volume") - stagingPath := v.stagingDirForVolume(v.containerMountPoint, vol) + stagingPath := v.stagingDirForVolume(v.containerMountPoint, vol, usage) return v.plugin.NodeUnstageVolume(ctx, vol.ID, stagingPath, @@ -287,8 +288,8 @@ func combineErrors(maybeErrs ...error) error { return result.ErrorOrNil() } -func (v *volumeManager) unpublishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation) error { - pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, vol, alloc) +func (v *volumeManager) unpublishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) error { + pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, vol, alloc, usage) rpcErr := v.plugin.NodeUnpublishVolume(ctx, vol.ID, pluginTargetPath, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), @@ -296,7 +297,7 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, vol *structs.CSIVol grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)), ) - hostTargetPath := v.allocDirForVolume(v.mountRoot, vol, alloc) + hostTargetPath := v.allocDirForVolume(v.mountRoot, vol, alloc, usage) if _, err := os.Stat(hostTargetPath); os.IsNotExist(err) { // Host Target Path already got destroyed, just return any rpcErr return rpcErr @@ -317,11 +318,11 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, vol *structs.CSIVol return rpcErr } -func (v *volumeManager) UnmountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation) error { +func (v *volumeManager) UnmountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) error { logger := v.logger.With("volume_id", vol.ID, "alloc_id", alloc.ID) ctx = hclog.WithContext(ctx, logger) - err := v.unpublishVolume(ctx, vol, alloc) + err := v.unpublishVolume(ctx, vol, alloc, usage) if err != nil { return err } @@ -332,5 +333,5 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, vol *structs.CSIVolum // TODO(GH-7029): Implement volume usage tracking and only unstage volumes // when the last alloc stops using it. - return v.unstageVolume(ctx, vol) + return v.unstageVolume(ctx, vol, usage) } diff --git a/client/pluginmanager/csimanager/volume_test.go b/client/pluginmanager/csimanager/volume_test.go index 5b4a9bafe..689bd86a7 100644 --- a/client/pluginmanager/csimanager/volume_test.go +++ b/client/pluginmanager/csimanager/volume_test.go @@ -27,6 +27,7 @@ func TestVolumeManager_ensureStagingDir(t *testing.T) { cases := []struct { Name string Volume *structs.CSIVolume + UsageOptions *UsageOptions CreateDirAheadOfTime bool MountDirAheadOfTime bool @@ -34,21 +35,25 @@ func TestVolumeManager_ensureStagingDir(t *testing.T) { ExpectedMountState bool }{ { - Name: "Creates a directory when one does not exist", - Volume: &structs.CSIVolume{ID: "foo"}, + Name: "Creates a directory when one does not exist", + Volume: &structs.CSIVolume{ID: "foo"}, + UsageOptions: &UsageOptions{}, }, { Name: "Does not fail because of a pre-existing directory", Volume: &structs.CSIVolume{ID: "foo"}, + UsageOptions: &UsageOptions{}, CreateDirAheadOfTime: true, }, { - Name: "Returns negative mount info", - Volume: &structs.CSIVolume{ID: "foo"}, + Name: "Returns negative mount info", + UsageOptions: &UsageOptions{}, + Volume: &structs.CSIVolume{ID: "foo"}, }, { Name: "Returns positive mount info", Volume: &structs.CSIVolume{ID: "foo"}, + UsageOptions: &UsageOptions{}, CreateDirAheadOfTime: true, MountDirAheadOfTime: true, ExpectedMountState: true, @@ -75,7 +80,7 @@ func TestVolumeManager_ensureStagingDir(t *testing.T) { csiFake := &csifake.Client{} manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) - expectedStagingPath := manager.stagingDirForVolume(tmpPath, tc.Volume) + expectedStagingPath := manager.stagingDirForVolume(tmpPath, tc.Volume, tc.UsageOptions) if tc.CreateDirAheadOfTime { err := os.MkdirAll(expectedStagingPath, 0700) @@ -84,7 +89,7 @@ func TestVolumeManager_ensureStagingDir(t *testing.T) { // Step 3: Now we can do some testing - path, detectedMount, testErr := manager.ensureStagingDir(tc.Volume) + path, detectedMount, testErr := manager.ensureStagingDir(tc.Volume, tc.UsageOptions) if tc.ExpectedErr != nil { require.EqualError(t, testErr, tc.ExpectedErr.Error()) return // We don't perform extra validation if an error was detected. @@ -112,10 +117,11 @@ func TestVolumeManager_ensureStagingDir(t *testing.T) { func TestVolumeManager_stageVolume(t *testing.T) { t.Parallel() cases := []struct { - Name string - Volume *structs.CSIVolume - PluginErr error - ExpectedErr error + Name string + Volume *structs.CSIVolume + UsageOptions *UsageOptions + PluginErr error + ExpectedErr error }{ { Name: "Returns an error when an invalid AttachmentMode is provided", @@ -123,7 +129,8 @@ func TestVolumeManager_stageVolume(t *testing.T) { ID: "foo", AttachmentMode: "nonsense", }, - ExpectedErr: errors.New("Unknown volume attachment mode: nonsense"), + UsageOptions: &UsageOptions{}, + ExpectedErr: errors.New("Unknown volume attachment mode: nonsense"), }, { Name: "Returns an error when an invalid AccessMode is provided", @@ -132,7 +139,8 @@ func TestVolumeManager_stageVolume(t *testing.T) { AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice, AccessMode: "nonsense", }, - ExpectedErr: errors.New("Unknown volume access mode: nonsense"), + UsageOptions: &UsageOptions{}, + ExpectedErr: errors.New("Unknown volume access mode: nonsense"), }, { Name: "Returns an error when the plugin returns an error", @@ -141,8 +149,9 @@ func TestVolumeManager_stageVolume(t *testing.T) { AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice, AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, }, - PluginErr: errors.New("Some Unknown Error"), - ExpectedErr: errors.New("Some Unknown Error"), + UsageOptions: &UsageOptions{}, + PluginErr: errors.New("Some Unknown Error"), + ExpectedErr: errors.New("Some Unknown Error"), }, { Name: "Happy Path", @@ -151,8 +160,9 @@ func TestVolumeManager_stageVolume(t *testing.T) { AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice, AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, }, - PluginErr: nil, - ExpectedErr: nil, + UsageOptions: &UsageOptions{}, + PluginErr: nil, + ExpectedErr: nil, }, } @@ -167,7 +177,7 @@ func TestVolumeManager_stageVolume(t *testing.T) { manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) ctx := context.Background() - err := manager.stageVolume(ctx, tc.Volume) + err := manager.stageVolume(ctx, tc.Volume, tc.UsageOptions) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error()) @@ -183,6 +193,7 @@ func TestVolumeManager_unstageVolume(t *testing.T) { cases := []struct { Name string Volume *structs.CSIVolume + UsageOptions *UsageOptions PluginErr error ExpectedErr error ExpectedCSICallCount int64 @@ -192,6 +203,7 @@ func TestVolumeManager_unstageVolume(t *testing.T) { Volume: &structs.CSIVolume{ ID: "foo", }, + UsageOptions: &UsageOptions{}, PluginErr: errors.New("Some Unknown Error"), ExpectedErr: errors.New("Some Unknown Error"), ExpectedCSICallCount: 1, @@ -201,6 +213,7 @@ func TestVolumeManager_unstageVolume(t *testing.T) { Volume: &structs.CSIVolume{ ID: "foo", }, + UsageOptions: &UsageOptions{}, PluginErr: nil, ExpectedErr: nil, ExpectedCSICallCount: 1, @@ -218,7 +231,7 @@ func TestVolumeManager_unstageVolume(t *testing.T) { manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) ctx := context.Background() - err := manager.unstageVolume(ctx, tc.Volume) + err := manager.unstageVolume(ctx, tc.Volume, tc.UsageOptions) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error()) @@ -237,6 +250,7 @@ func TestVolumeManager_publishVolume(t *testing.T) { Name string Allocation *structs.Allocation Volume *structs.CSIVolume + UsageOptions *UsageOptions PluginErr error ExpectedErr error ExpectedCSICallCount int64 @@ -249,6 +263,7 @@ func TestVolumeManager_publishVolume(t *testing.T) { AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice, AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, }, + UsageOptions: &UsageOptions{}, PluginErr: errors.New("Some Unknown Error"), ExpectedErr: errors.New("Some Unknown Error"), ExpectedCSICallCount: 1, @@ -261,6 +276,7 @@ func TestVolumeManager_publishVolume(t *testing.T) { AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice, AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, }, + UsageOptions: &UsageOptions{}, PluginErr: nil, ExpectedErr: nil, ExpectedCSICallCount: 1, @@ -278,7 +294,7 @@ func TestVolumeManager_publishVolume(t *testing.T) { manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) ctx := context.Background() - _, err := manager.publishVolume(ctx, tc.Volume, tc.Allocation) + _, err := manager.publishVolume(ctx, tc.Volume, tc.Allocation, tc.UsageOptions) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error()) @@ -297,6 +313,7 @@ func TestVolumeManager_unpublishVolume(t *testing.T) { Name string Allocation *structs.Allocation Volume *structs.CSIVolume + UsageOptions *UsageOptions PluginErr error ExpectedErr error ExpectedCSICallCount int64 @@ -307,6 +324,7 @@ func TestVolumeManager_unpublishVolume(t *testing.T) { Volume: &structs.CSIVolume{ ID: "foo", }, + UsageOptions: &UsageOptions{}, PluginErr: errors.New("Some Unknown Error"), ExpectedErr: errors.New("Some Unknown Error"), ExpectedCSICallCount: 1, @@ -317,6 +335,7 @@ func TestVolumeManager_unpublishVolume(t *testing.T) { Volume: &structs.CSIVolume{ ID: "foo", }, + UsageOptions: &UsageOptions{}, PluginErr: nil, ExpectedErr: nil, ExpectedCSICallCount: 1, @@ -334,7 +353,7 @@ func TestVolumeManager_unpublishVolume(t *testing.T) { manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) ctx := context.Background() - err := manager.unpublishVolume(ctx, tc.Volume, tc.Allocation) + err := manager.unpublishVolume(ctx, tc.Volume, tc.Allocation, tc.UsageOptions) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error())