diff --git a/client/csi_endpoint.go b/client/csi_endpoint.go index 43248daf8..a4251e473 100644 --- a/client/csi_endpoint.go +++ b/client/csi_endpoint.go @@ -190,7 +190,7 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re AccessMode: string(req.AccessMode), } - err = mounter.UnmountVolume(ctx, req.VolumeID, req.AllocID, usageOpts) + err = mounter.UnmountVolume(ctx, req.VolumeID, req.ExternalID, req.AllocID, usageOpts) if err != nil { return err } diff --git a/client/pluginmanager/csimanager/interface.go b/client/pluginmanager/csimanager/interface.go index 29f393c5a..343e8d977 100644 --- a/client/pluginmanager/csimanager/interface.go +++ b/client/pluginmanager/csimanager/interface.go @@ -42,7 +42,7 @@ func (u *UsageOptions) ToFS() string { type VolumeMounter interface { MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions, publishContext map[string]string) (*MountInfo, error) - UnmountVolume(ctx context.Context, volID, allocID string, usageOpts *UsageOptions) error + UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions) error } type Manager interface { diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index d300e805a..10c39fa8d 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -259,7 +259,7 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, // once for each staging path that a volume has been staged under. // It is safe to call multiple times and a plugin is required to return OK if // the volume has been unstaged or was never staged on the node. -func (v *volumeManager) unstageVolume(ctx context.Context, volID string, usage *UsageOptions) error { +func (v *volumeManager) unstageVolume(ctx context.Context, volID, remoteID string, usage *UsageOptions) error { logger := hclog.FromContext(ctx) logger.Trace("Unstaging volume") stagingPath := v.stagingDirForVolume(v.containerMountPoint, volID, usage) @@ -267,7 +267,7 @@ func (v *volumeManager) unstageVolume(ctx context.Context, volID string, usage * // CSI NodeUnstageVolume errors for timeout, codes.Unavailable and // codes.ResourceExhausted are retried; all other errors are fatal. return v.plugin.NodeUnstageVolume(ctx, - volID, + remoteID, stagingPath, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), grpc_retry.WithMax(3), @@ -288,12 +288,12 @@ func combineErrors(maybeErrs ...error) error { return result.ErrorOrNil() } -func (v *volumeManager) unpublishVolume(ctx context.Context, volID, allocID string, usage *UsageOptions) error { +func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) error { pluginTargetPath := v.allocDirForVolume(v.containerMountPoint, volID, allocID, usage) // CSI NodeUnpublishVolume errors for timeout, codes.Unavailable and // codes.ResourceExhausted are retried; all other errors are fatal. - rpcErr := v.plugin.NodeUnpublishVolume(ctx, volID, pluginTargetPath, + rpcErr := v.plugin.NodeUnpublishVolume(ctx, remoteID, pluginTargetPath, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), grpc_retry.WithMax(3), grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)), @@ -325,16 +325,16 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, allocID stri return rpcErr } -func (v *volumeManager) UnmountVolume(ctx context.Context, volID, allocID string, usage *UsageOptions) (err error) { +func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) (err error) { logger := v.logger.With("volume_id", volID, "alloc_id", allocID) ctx = hclog.WithContext(ctx, logger) - err = v.unpublishVolume(ctx, volID, allocID, usage) + err = v.unpublishVolume(ctx, volID, remoteID, allocID, usage) if err == nil { canRelease := v.usageTracker.Free(allocID, volID, usage) if v.requiresStaging && canRelease { - err = v.unstageVolume(ctx, volID, usage) + err = v.unstageVolume(ctx, volID, remoteID, usage) } } diff --git a/client/pluginmanager/csimanager/volume_test.go b/client/pluginmanager/csimanager/volume_test.go index 025c66739..4922cddbe 100644 --- a/client/pluginmanager/csimanager/volume_test.go +++ b/client/pluginmanager/csimanager/volume_test.go @@ -236,7 +236,8 @@ func TestVolumeManager_unstageVolume(t *testing.T) { manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake, tmpPath, tmpPath, true) ctx := context.Background() - err := manager.unstageVolume(ctx, tc.Volume.ID, tc.UsageOptions) + err := manager.unstageVolume(ctx, + tc.Volume.ID, tc.Volume.RemoteID(), tc.UsageOptions) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error()) @@ -416,7 +417,8 @@ func TestVolumeManager_unpublishVolume(t *testing.T) { manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake, tmpPath, tmpPath, true) ctx := context.Background() - err := manager.unpublishVolume(ctx, tc.Volume.ID, tc.Allocation.ID, tc.UsageOptions) + err := manager.unpublishVolume(ctx, + tc.Volume.ID, tc.Volume.RemoteID(), tc.Allocation.ID, tc.UsageOptions) if tc.ExpectedErr != nil { require.EqualError(t, err, tc.ExpectedErr.Error()) @@ -476,7 +478,7 @@ func TestVolumeManager_MountVolumeEvents(t *testing.T) { require.Equal(t, "true", e.Details["success"]) events = events[1:] - err = manager.UnmountVolume(ctx, vol.ID, alloc.ID, usage) + err = manager.UnmountVolume(ctx, vol.ID, vol.RemoteID(), alloc.ID, usage) require.NoError(t, err) require.Equal(t, 1, len(events)) diff --git a/client/structs/csi.go b/client/structs/csi.go index f76ab4ebf..99f0b0773 100644 --- a/client/structs/csi.go +++ b/client/structs/csi.go @@ -31,7 +31,7 @@ type CSIControllerQuery struct { } type ClientCSIControllerValidateVolumeRequest struct { - VolumeID string + VolumeID string // note: this is the external ID AttachmentMode structs.CSIVolumeAttachmentMode AccessMode structs.CSIVolumeAccessMode @@ -43,7 +43,7 @@ type ClientCSIControllerValidateVolumeResponse struct { } type ClientCSIControllerAttachVolumeRequest struct { - // The ID of the volume to be used on a node. + // The external ID of the volume to be used on a node. // This field is REQUIRED. VolumeID string @@ -137,10 +137,11 @@ type ClientCSIControllerDetachVolumeResponse struct{} // a Nomad client to tell a CSI node plugin on that client to perform // NodeUnpublish and NodeUnstage. type ClientCSINodeDetachVolumeRequest struct { - PluginID string // ID of the plugin that manages the volume (required) - VolumeID string // ID of the volume to be unpublished (required) - AllocID string // ID of the allocation we're unpublishing for (required) - NodeID string // ID of the Nomad client targeted + PluginID string // ID of the plugin that manages the volume (required) + VolumeID string // ID of the volume to be unpublished (required) + AllocID string // ID of the allocation we're unpublishing for (required) + NodeID string // ID of the Nomad client targeted + ExternalID string // External ID of the volume to be unpublished (required) // These fields should match the original volume request so that // we can find the mount points on the client diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 55c765d10..fa64df8af 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -862,7 +862,8 @@ func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]i // operations or releasing the claim. nReq := &cstructs.ClientCSINodeDetachVolumeRequest{ PluginID: args.plug.ID, - VolumeID: vol.RemoteID(), + VolumeID: vol.ID, + ExternalID: vol.RemoteID(), AllocID: args.allocID, NodeID: nodeID, AttachmentMode: vol.AttachmentMode, @@ -880,13 +881,30 @@ func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]i // on the node need it, but we also only want to make this // call at most once per node if vol.ControllerRequired && args.nodeClaims[nodeID] < 1 { + + // we need to get the CSI Node ID, which is not the same as + // the Nomad Node ID + ws := memdb.NewWatchSet() + targetNode, err := srv.State().NodeByID(ws, nodeID) + if err != nil { + return args.nodeClaims, err + } + if targetNode == nil { + return args.nodeClaims, fmt.Errorf("%s: %s", + structs.ErrUnknownNodePrefix, nodeID) + } + targetCSIInfo, ok := targetNode.CSINodePlugins[args.plug.ID] + if !ok { + return args.nodeClaims, fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID) + } + controllerNodeID, err := nodeForControllerPlugin(srv.State(), args.plug) - if err != nil || nodeID == "" { + if err != nil || controllerNodeID == "" { return args.nodeClaims, err } cReq := &cstructs.ClientCSIControllerDetachVolumeRequest{ VolumeID: vol.RemoteID(), - ClientCSINodeID: nodeID, + ClientCSINodeID: targetCSIInfo.NodeInfo.ID, } cReq.PluginID = args.plug.ID cReq.ControllerNodeID = controllerNodeID diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 32d1cad04..890999192 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2364,7 +2364,7 @@ func TestCSI_GCVolumeClaims_Reap(t *testing.T) { ClaimsCount: map[string]int{node.ID: 1}, ControllerRequired: true, ExpectedErr: fmt.Sprintf( - "no controllers available for plugin %q", plugin.ID), + "Unknown node: %s", node.ID), ExpectedClaimsCount: 0, ExpectedNodeDetachVolumeCount: 1, ExpectedControllerDetachVolumeCount: 0,