From 4a105a66797b249dbe936366c83bf249cfb7eb7b Mon Sep 17 00:00:00 2001 From: Danielle Lancashire Date: Mon, 17 Feb 2020 13:57:25 +0100 Subject: [PATCH] csi: Claim CSI Volumes during csi_hook.Prerun This commit is the initial implementation of claiming volumes from the server and passes through any publishContext information as appropriate. There's nothing too fancy here. --- client/allocrunner/csi_hook.go | 55 ++++++++++++++++++- client/pluginmanager/csimanager/interface.go | 2 +- client/pluginmanager/csimanager/volume.go | 14 ++--- .../pluginmanager/csimanager/volume_test.go | 4 +- 4 files changed, 62 insertions(+), 13 deletions(-) diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index 409c4822a..8e6c05397 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -32,7 +32,7 @@ func (c *csiHook) Prerun() error { } ctx := context.TODO() - volumes, err := c.csiVolumesFromAlloc() + volumes, err := c.claimVolumesFromAlloc() if err != nil { return err } @@ -50,7 +50,7 @@ func (c *csiHook) Prerun() error { AccessMode: string(pair.volume.AccessMode), } - mountInfo, err := mounter.MountVolume(ctx, pair.volume, c.alloc, usageOpts) + mountInfo, err := mounter.MountVolume(ctx, pair.volume, c.alloc, usageOpts, pair.publishContext) if err != nil { return err } @@ -109,11 +109,60 @@ func (c *csiHook) Postrun() error { type volumeAndRequest struct { volume *structs.CSIVolume request *structs.VolumeRequest + + // When volumeAndRequest was returned from a volume claim, this field will be + // populated for plugins that require it. + publishContext map[string]string +} + +// claimVolumesFromAlloc is used by the pre-run hook to fetch all of the volume +// metadata and claim it for use by this alloc/node at the same time. +func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error) { + result := make(map[string]*volumeAndRequest) + tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup) + + // Initially, populate the result map with all of the requests + for alias, volumeRequest := range tg.Volumes { + if volumeRequest.Type == structs.VolumeTypeCSI { + result[alias] = &volumeAndRequest{request: volumeRequest} + } + } + + // Iterate over the result map and upsert the volume field as each volume gets + // claimed by the server. + for alias, pair := range result { + claimType := structs.CSIVolumeClaimWrite + if pair.request.ReadOnly { + claimType = structs.CSIVolumeClaimRead + } + + req := &structs.CSIVolumeClaimRequest{ + VolumeID: pair.request.Source, + AllocationID: c.alloc.ID, + Claim: claimType, + } + req.Region = c.alloc.Job.Region + + var resp structs.CSIVolumeClaimResponse + if err := c.rpcClient.RPC("CSIVolume.Claim", req, &resp); err != nil { + return nil, err + } + + if resp.Volume == nil { + return nil, fmt.Errorf("Unexpected nil volume returned for ID: %v", pair.request.Source) + } + + result[alias].volume = resp.Volume + result[alias].publishContext = resp.PublishContext + } + + return result, nil } // csiVolumesFromAlloc finds all the CSI Volume requests from the allocation's // task group and then fetches them from the Nomad Server, before returning -// them in the form of map[RequestedAlias]*structs.CSIVolume. +// them in the form of map[RequestedAlias]*volumeAndReqest. This allows us to +// thread the request context through to determine usage options for each volume. // // If any volume fails to validate then we return an error. func (c *csiHook) csiVolumesFromAlloc() (map[string]*volumeAndRequest, error) { diff --git a/client/pluginmanager/csimanager/interface.go b/client/pluginmanager/csimanager/interface.go index 96cc05940..f2458fd11 100644 --- a/client/pluginmanager/csimanager/interface.go +++ b/client/pluginmanager/csimanager/interface.go @@ -45,7 +45,7 @@ func (u *UsageOptions) ToFS() string { } type VolumeMounter interface { - MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions) (*MountInfo, error) + MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions, publishContext map[string]string) (*MountInfo, error) UnmountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions) error } diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index 12393748b..48b116e97 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -120,7 +120,7 @@ func (v *volumeManager) ensureAllocDir(vol *structs.CSIVolume, alloc *structs.Al // stageVolume prepares a volume for use by allocations. When a plugin exposes // the STAGE_UNSTAGE_VOLUME capability it MUST be called once-per-volume for a // given usage mode before the volume can be NodePublish-ed. -func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume, usage *UsageOptions) error { +func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume, usage *UsageOptions, publishContext map[string]string) error { logger := hclog.FromContext(ctx) logger.Trace("Preparing volume staging environment") hostStagingPath, isMount, err := v.ensureStagingDir(vol, usage) @@ -148,7 +148,7 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume, // https://github.com/container-storage-interface/spec/blob/4731db0e0bc53238b93850f43ab05d9355df0fd9/spec.md#nodestagevolume-errors return v.plugin.NodeStageVolume(ctx, vol.ID, - nil, /* TODO: Get publishContext from Server */ + publishContext, pluginStagingPath, capability, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), @@ -157,7 +157,7 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume, ) } -func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) (*MountInfo, error) { +func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions, publishContext map[string]string) (*MountInfo, error) { logger := hclog.FromContext(ctx) var pluginStagingPath string if v.requiresStaging { @@ -182,7 +182,7 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum err = v.plugin.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{ VolumeID: vol.ID, - PublishContext: nil, // TODO: get publishcontext from server + PublishContext: publishContext, StagingTargetPath: pluginStagingPath, TargetPath: pluginTargetPath, VolumeCapability: capabilities, @@ -200,17 +200,17 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum // configuration for the provided allocation. // // TODO: Validate remote volume attachment and implement. -func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) (*MountInfo, error) { +func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions, publishContext map[string]string) (*MountInfo, error) { logger := v.logger.With("volume_id", vol.ID, "alloc_id", alloc.ID) ctx = hclog.WithContext(ctx, logger) if v.requiresStaging { - if err := v.stageVolume(ctx, vol, usage); err != nil { + if err := v.stageVolume(ctx, vol, usage, publishContext); err != nil { return nil, err } } - mountInfo, err := v.publishVolume(ctx, vol, alloc, usage) + mountInfo, err := v.publishVolume(ctx, vol, alloc, usage, publishContext) if err != nil { return nil, err } diff --git a/client/pluginmanager/csimanager/volume_test.go b/client/pluginmanager/csimanager/volume_test.go index 689bd86a7..148c55249 100644 --- a/client/pluginmanager/csimanager/volume_test.go +++ b/client/pluginmanager/csimanager/volume_test.go @@ -177,7 +177,7 @@ func TestVolumeManager_stageVolume(t *testing.T) { manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) ctx := context.Background() - err := manager.stageVolume(ctx, tc.Volume, tc.UsageOptions) + err := manager.stageVolume(ctx, tc.Volume, tc.UsageOptions, nil) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error()) @@ -294,7 +294,7 @@ func TestVolumeManager_publishVolume(t *testing.T) { manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true) ctx := context.Background() - _, err := manager.publishVolume(ctx, tc.Volume, tc.Allocation, tc.UsageOptions) + _, err := manager.publishVolume(ctx, tc.Volume, tc.Allocation, tc.UsageOptions, nil) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error())