csi: Claim CSI Volumes during csi_hook.Prerun

This commit is the initial implementation of claiming volumes from the
server and passes through any publishContext information as appropriate.

There's nothing too fancy here.
This commit is contained in:
Danielle Lancashire
2020-02-17 13:57:25 +01:00
committed by Tim Gross
parent 5aa4490ac6
commit 4a105a6679
4 changed files with 62 additions and 13 deletions

View File

@@ -32,7 +32,7 @@ func (c *csiHook) Prerun() error {
}
ctx := context.TODO()
volumes, err := c.csiVolumesFromAlloc()
volumes, err := c.claimVolumesFromAlloc()
if err != nil {
return err
}
@@ -50,7 +50,7 @@ func (c *csiHook) Prerun() error {
AccessMode: string(pair.volume.AccessMode),
}
mountInfo, err := mounter.MountVolume(ctx, pair.volume, c.alloc, usageOpts)
mountInfo, err := mounter.MountVolume(ctx, pair.volume, c.alloc, usageOpts, pair.publishContext)
if err != nil {
return err
}
@@ -109,11 +109,60 @@ func (c *csiHook) Postrun() error {
type volumeAndRequest struct {
volume *structs.CSIVolume
request *structs.VolumeRequest
// When volumeAndRequest was returned from a volume claim, this field will be
// populated for plugins that require it.
publishContext map[string]string
}
// claimVolumesFromAlloc is used by the pre-run hook to fetch all of the volume
// metadata and claim it for use by this alloc/node at the same time.
func (c *csiHook) claimVolumesFromAlloc() (map[string]*volumeAndRequest, error) {
result := make(map[string]*volumeAndRequest)
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
// Initially, populate the result map with all of the requests
for alias, volumeRequest := range tg.Volumes {
if volumeRequest.Type == structs.VolumeTypeCSI {
result[alias] = &volumeAndRequest{request: volumeRequest}
}
}
// Iterate over the result map and upsert the volume field as each volume gets
// claimed by the server.
for alias, pair := range result {
claimType := structs.CSIVolumeClaimWrite
if pair.request.ReadOnly {
claimType = structs.CSIVolumeClaimRead
}
req := &structs.CSIVolumeClaimRequest{
VolumeID: pair.request.Source,
AllocationID: c.alloc.ID,
Claim: claimType,
}
req.Region = c.alloc.Job.Region
var resp structs.CSIVolumeClaimResponse
if err := c.rpcClient.RPC("CSIVolume.Claim", req, &resp); err != nil {
return nil, err
}
if resp.Volume == nil {
return nil, fmt.Errorf("Unexpected nil volume returned for ID: %v", pair.request.Source)
}
result[alias].volume = resp.Volume
result[alias].publishContext = resp.PublishContext
}
return result, nil
}
// csiVolumesFromAlloc finds all the CSI Volume requests from the allocation's
// task group and then fetches them from the Nomad Server, before returning
// them in the form of map[RequestedAlias]*structs.CSIVolume.
// them in the form of map[RequestedAlias]*volumeAndReqest. This allows us to
// thread the request context through to determine usage options for each volume.
//
// If any volume fails to validate then we return an error.
func (c *csiHook) csiVolumesFromAlloc() (map[string]*volumeAndRequest, error) {

View File

@@ -45,7 +45,7 @@ func (u *UsageOptions) ToFS() string {
}
type VolumeMounter interface {
MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions) (*MountInfo, error)
MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions, publishContext map[string]string) (*MountInfo, error)
UnmountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions) error
}

View File

@@ -120,7 +120,7 @@ func (v *volumeManager) ensureAllocDir(vol *structs.CSIVolume, alloc *structs.Al
// stageVolume prepares a volume for use by allocations. When a plugin exposes
// the STAGE_UNSTAGE_VOLUME capability it MUST be called once-per-volume for a
// given usage mode before the volume can be NodePublish-ed.
func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume, usage *UsageOptions) error {
func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume, usage *UsageOptions, publishContext map[string]string) error {
logger := hclog.FromContext(ctx)
logger.Trace("Preparing volume staging environment")
hostStagingPath, isMount, err := v.ensureStagingDir(vol, usage)
@@ -148,7 +148,7 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume,
// https://github.com/container-storage-interface/spec/blob/4731db0e0bc53238b93850f43ab05d9355df0fd9/spec.md#nodestagevolume-errors
return v.plugin.NodeStageVolume(ctx,
vol.ID,
nil, /* TODO: Get publishContext from Server */
publishContext,
pluginStagingPath,
capability,
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
@@ -157,7 +157,7 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume,
)
}
func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) (*MountInfo, error) {
func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions, publishContext map[string]string) (*MountInfo, error) {
logger := hclog.FromContext(ctx)
var pluginStagingPath string
if v.requiresStaging {
@@ -182,7 +182,7 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum
err = v.plugin.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
VolumeID: vol.ID,
PublishContext: nil, // TODO: get publishcontext from server
PublishContext: publishContext,
StagingTargetPath: pluginStagingPath,
TargetPath: pluginTargetPath,
VolumeCapability: capabilities,
@@ -200,17 +200,17 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum
// configuration for the provided allocation.
//
// TODO: Validate remote volume attachment and implement.
func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) (*MountInfo, error) {
func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions, publishContext map[string]string) (*MountInfo, error) {
logger := v.logger.With("volume_id", vol.ID, "alloc_id", alloc.ID)
ctx = hclog.WithContext(ctx, logger)
if v.requiresStaging {
if err := v.stageVolume(ctx, vol, usage); err != nil {
if err := v.stageVolume(ctx, vol, usage, publishContext); err != nil {
return nil, err
}
}
mountInfo, err := v.publishVolume(ctx, vol, alloc, usage)
mountInfo, err := v.publishVolume(ctx, vol, alloc, usage, publishContext)
if err != nil {
return nil, err
}

View File

@@ -177,7 +177,7 @@ func TestVolumeManager_stageVolume(t *testing.T) {
manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true)
ctx := context.Background()
err := manager.stageVolume(ctx, tc.Volume, tc.UsageOptions)
err := manager.stageVolume(ctx, tc.Volume, tc.UsageOptions, nil)
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
@@ -294,7 +294,7 @@ func TestVolumeManager_publishVolume(t *testing.T) {
manager := newVolumeManager(testlog.HCLogger(t), csiFake, tmpPath, tmpPath, true)
ctx := context.Background()
_, err := manager.publishVolume(ctx, tc.Volume, tc.Allocation, tc.UsageOptions)
_, err := manager.publishVolume(ctx, tc.Volume, tc.Allocation, tc.UsageOptions, nil)
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())