diff --git a/.changelog/20532.txt b/.changelog/20532.txt new file mode 100644 index 000000000..0c734cace --- /dev/null +++ b/.changelog/20532.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Fixed a bug where volumes in different namespaces but the same ID would fail to stage on the same client +``` diff --git a/client/allocrunner/csi_hook.go b/client/allocrunner/csi_hook.go index 6aac12124..775a06912 100644 --- a/client/allocrunner/csi_hook.go +++ b/client/allocrunner/csi_hook.go @@ -323,6 +323,7 @@ func (c *csiHook) claimVolumes(results map[string]*volumePublishResult) error { // populate data we'll write later to disk result.stub.VolumeID = resp.Volume.ID + result.stub.VolumeNamespace = resp.Volume.Namespace result.stub.VolumeExternalID = resp.Volume.RemoteID() result.stub.PluginID = resp.Volume.PluginID result.publishContext = resp.PublishContext @@ -532,7 +533,8 @@ func (c *csiHook) unmountImpl(result *volumePublishResult) error { } return manager.UnmountVolume(c.shutdownCtx, - result.stub.VolumeID, result.stub.VolumeExternalID, c.alloc.ID, usageOpts) + result.stub.VolumeNamespace, result.stub.VolumeID, + result.stub.VolumeExternalID, c.alloc.ID, usageOpts) } // Shutdown will get called when the client is gracefully diff --git a/client/allocrunner/csi_hook_test.go b/client/allocrunner/csi_hook_test.go index 83d28be7c..457ad1020 100644 --- a/client/allocrunner/csi_hook_test.go +++ b/client/allocrunner/csi_hook_test.go @@ -4,6 +4,7 @@ package allocrunner import ( + "context" "errors" "fmt" "testing" @@ -21,7 +22,6 @@ import ( "github.com/hashicorp/nomad/plugins/drivers" "github.com/hashicorp/nomad/testutil" "github.com/shoenig/test/must" - "github.com/stretchr/testify/require" ) var _ interfaces.RunnerPrerunHook = (*csiHook)(nil) @@ -31,30 +31,41 @@ func TestCSIHook(t *testing.T) { ci.Parallel(t) alloc := mock.Alloc() + + volID := "volID0" + volName := "volName" + pluginID := "plugin_id" + + // expected by most of the tests testMountSrc := fmt.Sprintf( - "test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID) - logger := testlog.HCLogger(t) + "test-alloc-dir/%s/ns/%s/ro-file-system-single-node-reader-only", alloc.ID, volID) testcases := []struct { - name string - volumeRequests map[string]*structs.VolumeRequest - startsUnschedulable bool - startsWithClaims bool - startsWithStubs map[string]*state.CSIVolumeStub - startsWithValidMounts bool - failsFirstUnmount bool - expectedClaimErr error - expectedMounts map[string]*csimanager.MountInfo - expectedCalls map[string]int + name string + volumeRequests map[string]*structs.VolumeRequest + rpcNS string // namespace of volume, as returned by server in Claim + + startsUnschedulable bool // claim will fail + startsWithClaims bool // claim exists on server + startsWithStubs bool // mount info is written to client state + startsWithValidMounts bool // mounts were created + startingStub *state.CSIVolumeStub // mount info used in starting mounts/stubs + startingVolumeNS string // namespace of volume previously mounted + + failsFirstUnmount bool + expectedClaimErr error + expectedMounts map[string]*csimanager.MountInfo + expectedCalls map[string]int }{ { - name: "simple case", + name: "simple case", + rpcNS: "ns", volumeRequests: map[string]*structs.VolumeRequest{ - "vol0": { - Name: "vol0", + volName: { + Name: volName, Type: structs.VolumeTypeCSI, - Source: "testvolume0", + Source: volID, ReadOnly: true, AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, @@ -63,19 +74,20 @@ func TestCSIHook(t *testing.T) { }, }, expectedMounts: map[string]*csimanager.MountInfo{ - "vol0": &csimanager.MountInfo{Source: testMountSrc}, + volName: &csimanager.MountInfo{Source: testMountSrc}, }, expectedCalls: map[string]int{ "claim": 1, "MountVolume": 1, "UnmountVolume": 1, "unpublish": 1}, }, { - name: "per-alloc case", + name: "per-alloc case", + rpcNS: "ns", volumeRequests: map[string]*structs.VolumeRequest{ - "vol0": { - Name: "vol0", + volName: { + Name: volName, Type: structs.VolumeTypeCSI, - Source: "testvolume0", + Source: volID, ReadOnly: true, AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, @@ -84,19 +96,20 @@ func TestCSIHook(t *testing.T) { }, }, expectedMounts: map[string]*csimanager.MountInfo{ - "vol0": &csimanager.MountInfo{Source: testMountSrc}, + volName: &csimanager.MountInfo{Source: testMountSrc}, }, expectedCalls: map[string]int{ "claim": 1, "MountVolume": 1, "UnmountVolume": 1, "unpublish": 1}, }, { - name: "fatal error on claim", + name: "fatal error on claim", + rpcNS: "ns", volumeRequests: map[string]*structs.VolumeRequest{ - "vol0": { - Name: "vol0", + volName: { + Name: volName, Type: structs.VolumeTypeCSI, - Source: "testvolume0", + Source: volID, ReadOnly: true, AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, @@ -105,21 +118,19 @@ func TestCSIHook(t *testing.T) { }, }, startsUnschedulable: true, - expectedMounts: map[string]*csimanager.MountInfo{ - "vol0": &csimanager.MountInfo{Source: testMountSrc}, - }, - expectedCalls: map[string]int{"claim": 1}, + expectedCalls: map[string]int{"claim": 1, "UnmountVolume": 1, "unpublish": 1}, expectedClaimErr: errors.New( - "claiming volumes: could not claim volume testvolume0: volume is currently unschedulable"), + "claiming volumes: could not claim volume volID0: volume is currently unschedulable"), }, { - name: "retryable error on claim", + name: "retryable error on claim", + rpcNS: "ns", volumeRequests: map[string]*structs.VolumeRequest{ - "vol0": { - Name: "vol0", + volName: { + Name: volName, Type: structs.VolumeTypeCSI, - Source: "testvolume0", + Source: volID, ReadOnly: true, AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, @@ -129,18 +140,20 @@ func TestCSIHook(t *testing.T) { }, startsWithClaims: true, expectedMounts: map[string]*csimanager.MountInfo{ - "vol0": &csimanager.MountInfo{Source: testMountSrc}, + volName: &csimanager.MountInfo{Source: testMountSrc}, }, expectedCalls: map[string]int{ "claim": 2, "MountVolume": 1, "UnmountVolume": 1, "unpublish": 1}, }, + { - name: "already mounted", + name: "already mounted", + rpcNS: "ns", volumeRequests: map[string]*structs.VolumeRequest{ - "vol0": { - Name: "vol0", + volName: { + Name: volName, Type: structs.VolumeTypeCSI, - Source: "testvolume0", + Source: volID, ReadOnly: true, AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, @@ -148,25 +161,30 @@ func TestCSIHook(t *testing.T) { PerAlloc: false, }, }, - startsWithStubs: map[string]*state.CSIVolumeStub{"vol0": { - VolumeID: "vol0", - PluginID: "vol0-plugin", - ExternalNodeID: "i-example", - MountInfo: &csimanager.MountInfo{Source: testMountSrc}, - }}, + startsWithStubs: true, startsWithValidMounts: true, + startingVolumeNS: "ns", + startingStub: &state.CSIVolumeStub{ + VolumeID: volID, + VolumeNamespace: "ns", + PluginID: pluginID, + ExternalNodeID: "i-example", + MountInfo: &csimanager.MountInfo{Source: testMountSrc}, + }, expectedMounts: map[string]*csimanager.MountInfo{ - "vol0": &csimanager.MountInfo{Source: testMountSrc}, + volName: &csimanager.MountInfo{Source: testMountSrc}, }, expectedCalls: map[string]int{"HasMount": 1, "UnmountVolume": 1, "unpublish": 1}, }, + { - name: "existing but invalid mounts", + name: "existing but invalid mounts", + rpcNS: "ns", volumeRequests: map[string]*structs.VolumeRequest{ - "vol0": { - Name: "vol0", + volName: { + Name: volName, Type: structs.VolumeTypeCSI, - Source: "testvolume0", + Source: volID, ReadOnly: true, AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, @@ -174,27 +192,33 @@ func TestCSIHook(t *testing.T) { PerAlloc: false, }, }, - startsWithStubs: map[string]*state.CSIVolumeStub{"vol0": { - VolumeID: "testvolume0", - PluginID: "vol0-plugin", - ExternalNodeID: "i-example", - MountInfo: &csimanager.MountInfo{Source: testMountSrc}, - }}, + // same as case above, but the stub only exists in the client state + // db and not actually on-disk (ex. after host reboot) + startsWithStubs: true, startsWithValidMounts: false, + startingVolumeNS: "ns", + startingStub: &state.CSIVolumeStub{ + VolumeID: volID, + VolumeNamespace: "ns", + PluginID: pluginID, + ExternalNodeID: "i-example", + MountInfo: &csimanager.MountInfo{Source: testMountSrc}, + }, expectedMounts: map[string]*csimanager.MountInfo{ - "vol0": &csimanager.MountInfo{Source: testMountSrc}, + volName: &csimanager.MountInfo{Source: testMountSrc}, }, expectedCalls: map[string]int{ "HasMount": 1, "claim": 1, "MountVolume": 1, "UnmountVolume": 1, "unpublish": 1}, }, { - name: "retry on failed unmount", + name: "retry on failed unmount", + rpcNS: "ns", volumeRequests: map[string]*structs.VolumeRequest{ - "vol0": { - Name: "vol0", + volName: { + Name: volName, Type: structs.VolumeTypeCSI, - Source: "testvolume0", + Source: volID, ReadOnly: true, AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, @@ -204,12 +228,78 @@ func TestCSIHook(t *testing.T) { }, failsFirstUnmount: true, expectedMounts: map[string]*csimanager.MountInfo{ - "vol0": &csimanager.MountInfo{Source: testMountSrc}, + volName: &csimanager.MountInfo{Source: testMountSrc}, }, expectedCalls: map[string]int{ "claim": 1, "MountVolume": 1, "UnmountVolume": 2, "unpublish": 2}, }, + { + name: "client upgrade from version with missing namespace", + rpcNS: "", + volumeRequests: map[string]*structs.VolumeRequest{ + volName: { + Name: volName, + Type: structs.VolumeTypeCSI, + Source: volID, + ReadOnly: true, + AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + MountOptions: &structs.CSIMountOptions{}, + PerAlloc: false, + }, + }, + startsWithStubs: true, + startsWithValidMounts: true, + startingVolumeNS: "", // note: existing mount has no namespace + startingStub: &state.CSIVolumeStub{ + VolumeID: volID, + VolumeNamespace: "", + PluginID: pluginID, + ExternalNodeID: "i-example", + MountInfo: &csimanager.MountInfo{Source: fmt.Sprintf( + "test-alloc-dir/%s/volID0/ro-file-system-single-node-reader-only", alloc.ID)}, + }, + expectedMounts: map[string]*csimanager.MountInfo{ + volName: &csimanager.MountInfo{Source: fmt.Sprintf( + "test-alloc-dir/%s/volID0/ro-file-system-single-node-reader-only", alloc.ID)}, + }, + expectedCalls: map[string]int{"HasMount": 1, "UnmountVolume": 1, "unpublish": 1}, + }, + + { + name: "server upgrade from version with missing namespace", + rpcNS: "ns", + volumeRequests: map[string]*structs.VolumeRequest{ + volName: { + Name: volName, + Type: structs.VolumeTypeCSI, + Source: volID, + ReadOnly: true, + AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + MountOptions: &structs.CSIMountOptions{}, + PerAlloc: false, + }, + }, + startsWithStubs: true, + startsWithValidMounts: true, + startingVolumeNS: "", // note: existing mount has no namespace + startingStub: &state.CSIVolumeStub{ + VolumeID: volName, + VolumeNamespace: "", + PluginID: pluginID, + ExternalNodeID: "i-example", + MountInfo: &csimanager.MountInfo{Source: fmt.Sprintf( + "test-alloc-dir/%s/volID0/ro-file-system-single-node-reader-only", alloc.ID)}, + }, + expectedMounts: map[string]*csimanager.MountInfo{ + volName: &csimanager.MountInfo{Source: fmt.Sprintf( + "test-alloc-dir/%s/volID0/ro-file-system-single-node-reader-only", alloc.ID)}, + }, + expectedCalls: map[string]int{"HasMount": 1, "UnmountVolume": 1, "unpublish": 1}, + }, + { name: "should not run", volumeRequests: map[string]*structs.VolumeRequest{}, @@ -229,6 +319,7 @@ func TestCSIHook(t *testing.T) { mgr := &csimanager.MockCSIManager{VM: vm} rpcer := mockRPCer{ alloc: alloc, + ns: tc.rpcNS, callCounts: callCounts, hasExistingClaim: pointer.Of(tc.startsWithClaims), schedulable: pointer.Of(!tc.startsUnschedulable), @@ -239,9 +330,10 @@ func TestCSIHook(t *testing.T) { FSIsolation: drivers.FSIsolationChroot, MountConfigs: drivers.MountConfigSupportAll, }, - stubs: tc.startsWithStubs, + stubs: make(map[string]*state.CSIVolumeStub), } + logger := testlog.HCLogger(t) hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar.res, "secret") hook.minBackoffInterval = 1 * time.Millisecond hook.maxBackoffInterval = 10 * time.Millisecond @@ -249,29 +341,45 @@ func TestCSIHook(t *testing.T) { must.NotNil(t, hook) + if tc.startsWithStubs { + // write a fake mount stub to the "client state" + ar.stubs[volName] = tc.startingStub + ar.SetCSIVolumes(map[string]*state.CSIVolumeStub{volName: tc.startingStub}) + } + if tc.startsWithValidMounts { - // TODO: this works, but it requires knowledge of how the mock works. would rather vm.MountVolume() - vm.Mounts = map[string]bool{ - tc.expectedMounts["vol0"].Source: true, - } + // create a fake mount + req := tc.volumeRequests[volName] + vol := rpcer.testVolume(req.Source, tc.startingVolumeNS) + _, err := vm.MountVolume(context.TODO(), vol, alloc, + &csimanager.UsageOptions{ + ReadOnly: req.ReadOnly, + AttachmentMode: req.AttachmentMode, + AccessMode: req.AccessMode, + }, nil) + must.NoError(t, err) + vm.CallCounter.Reset() } if tc.failsFirstUnmount { vm.NextUnmountVolumeErr = errors.New("bad first attempt") } + err := hook.Prerun() if tc.expectedClaimErr != nil { - must.EqError(t, hook.Prerun(), tc.expectedClaimErr.Error()) - mounts := ar.res.GetCSIMounts() - must.Nil(t, mounts) + must.EqError(t, err, tc.expectedClaimErr.Error()) } else { - must.NoError(t, hook.Prerun()) - mounts := ar.res.GetCSIMounts() - must.MapEq(t, tc.expectedMounts, mounts, - must.Sprintf("got mounts: %v", mounts)) - must.NoError(t, hook.Postrun()) + must.NoError(t, err) } + mounts := ar.res.GetCSIMounts() + must.MapEq(t, tc.expectedMounts, mounts, + must.Sprintf("got mounts: %v", mounts)) + + // even if we failed to mount in the first place, we should get no + // errors from Postrun + must.NoError(t, hook.Postrun()) + if tc.failsFirstUnmount { // retrying the unmount doesn't block Postrun, so give it time // to run once more before checking the call counts to ensure @@ -281,7 +389,7 @@ func TestCSIHook(t *testing.T) { counts := callCounts.Get() must.MapEq(t, tc.expectedCalls, counts, - must.Sprintf("got calls: %v", counts)) + must.Sprintf("got calls: %v\n\texpected: %v", counts, tc.expectedCalls)) }) } @@ -293,13 +401,16 @@ func TestCSIHook(t *testing.T) { func TestCSIHook_Prerun_Validation(t *testing.T) { ci.Parallel(t) + volID := "volID0" + volName := "volName" + alloc := mock.Alloc() logger := testlog.HCLogger(t) volumeRequests := map[string]*structs.VolumeRequest{ - "vol0": { - Name: "vol0", + volName: { + Name: volName, Type: structs.VolumeTypeCSI, - Source: "testvolume0", + Source: volID, ReadOnly: true, AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, @@ -364,17 +475,17 @@ func TestCSIHook_Prerun_Validation(t *testing.T) { } hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar.res, "secret") - require.NotNil(t, hook) + must.NotNil(t, hook) if tc.expectedErr != "" { - require.EqualError(t, hook.Prerun(), tc.expectedErr) + must.EqError(t, hook.Prerun(), tc.expectedErr) mounts := ar.res.GetCSIMounts() - require.Nil(t, mounts) + must.Nil(t, mounts) } else { - require.NoError(t, hook.Prerun()) + must.NoError(t, hook.Prerun()) mounts := ar.res.GetCSIMounts() - require.NotNil(t, mounts) - require.NoError(t, hook.Postrun()) + must.NotNil(t, mounts) + must.NoError(t, hook.Postrun()) } }) } @@ -384,6 +495,7 @@ func TestCSIHook_Prerun_Validation(t *testing.T) { type mockRPCer struct { alloc *structs.Allocation + ns string callCounts *testutil.CallCounter hasExistingClaim *bool schedulable *bool @@ -395,7 +507,7 @@ func (r mockRPCer) RPC(method string, args any, reply any) error { case "CSIVolume.Claim": r.callCounts.Inc("claim") req := args.(*structs.CSIVolumeClaimRequest) - vol := r.testVolume(req.VolumeID) + vol := r.testVolume(req.VolumeID, r.ns) err := vol.Claim(req.ToClaim(), r.alloc) // after the first claim attempt is made, reset the volume's claims as @@ -425,10 +537,11 @@ func (r mockRPCer) RPC(method string, args any, reply any) error { // testVolume is a helper that optionally starts as unschedulable / claimed, so // that we can test retryable vs non-retryable failures -func (r mockRPCer) testVolume(id string) *structs.CSIVolume { +func (r mockRPCer) testVolume(id, ns string) *structs.CSIVolume { vol := structs.NewCSIVolume(id, 0) vol.Schedulable = *r.schedulable vol.PluginID = "plugin-" + id + vol.Namespace = ns vol.RequestedCapabilities = []*structs.CSIVolumeCapability{ { AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, diff --git a/client/allocrunner/state/state.go b/client/allocrunner/state/state.go index ed681bb68..ed5865b4c 100644 --- a/client/allocrunner/state/state.go +++ b/client/allocrunner/state/state.go @@ -86,6 +86,7 @@ type AllocVolumes struct { // relevant data that we need to persist about the volume. type CSIVolumeStub struct { VolumeID string + VolumeNamespace string VolumeExternalID string PluginID string ExternalNodeID string diff --git a/client/csi_endpoint.go b/client/csi_endpoint.go index dcbcd70e5..9d32f27cd 100644 --- a/client/csi_endpoint.go +++ b/client/csi_endpoint.go @@ -527,7 +527,7 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re AccessMode: req.AccessMode, } - err = manager.UnmountVolume(ctx, req.VolumeID, req.ExternalID, req.AllocID, usageOpts) + err = manager.UnmountVolume(ctx, req.VolumeNamespace, req.VolumeID, req.ExternalID, req.AllocID, usageOpts) if err != nil && !errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) { // if the unmounting previously happened but the server failed to // checkpoint, we'll get an error from Unmount but can safely @@ -565,7 +565,7 @@ func (c *CSI) NodeExpandVolume(req *structs.ClientCSINodeExpandVolumeRequest, re return err } - newCapacity, err := manager.ExpandVolume(ctx, + newCapacity, err := manager.ExpandVolume(ctx, req.VolumeNamespace, req.VolumeID, req.ExternalID, req.Claim.AllocationID, usageOpts, req.Capacity) if err != nil && !errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) { diff --git a/client/pluginmanager/csimanager/interface.go b/client/pluginmanager/csimanager/interface.go index 526df7515..85f7e78ae 100644 --- a/client/pluginmanager/csimanager/interface.go +++ b/client/pluginmanager/csimanager/interface.go @@ -56,9 +56,9 @@ func (u *UsageOptions) ToFS() string { type VolumeManager interface { MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions, publishContext map[string]string) (*MountInfo, error) - UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions) error + UnmountVolume(ctx context.Context, volNS, volID, remoteID, allocID string, usageOpts *UsageOptions) error HasMount(ctx context.Context, mountInfo *MountInfo) (bool, error) - ExpandVolume(ctx context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions, capacity *csi.CapacityRange) (int64, error) + ExpandVolume(ctx context.Context, volNS, volID, remoteID, allocID string, usageOpts *UsageOptions, capacity *csi.CapacityRange) (int64, error) ExternalID() string } diff --git a/client/pluginmanager/csimanager/testing.go b/client/pluginmanager/csimanager/testing.go index 88a5055a5..c3e0927f9 100644 --- a/client/pluginmanager/csimanager/testing.go +++ b/client/pluginmanager/csimanager/testing.go @@ -55,8 +55,8 @@ type MockVolumeManager struct { LastExpandVolumeCall *MockExpandVolumeCall } -func (m *MockVolumeManager) mountName(volID, allocID string, usageOpts *UsageOptions) string { - return filepath.Join("test-alloc-dir", allocID, volID, usageOpts.ToFS()) +func (m *MockVolumeManager) mountName(volNS, volID, allocID string, usageOpts *UsageOptions) string { + return filepath.Join("test-alloc-dir", allocID, volNS, volID, usageOpts.ToFS()) } func (m *MockVolumeManager) MountVolume(_ context.Context, vol *nstructs.CSIVolume, alloc *nstructs.Allocation, usageOpts *UsageOptions, publishContext map[string]string) (*MountInfo, error) { @@ -74,7 +74,7 @@ func (m *MockVolumeManager) MountVolume(_ context.Context, vol *nstructs.CSIVolu if m.Mounts == nil { m.Mounts = make(map[string]bool) } - source := m.mountName(vol.ID, alloc.ID, usageOpts) + source := m.mountName(vol.Namespace, vol.ID, alloc.ID, usageOpts) m.Mounts[source] = true return &MountInfo{ @@ -82,7 +82,7 @@ func (m *MockVolumeManager) MountVolume(_ context.Context, vol *nstructs.CSIVolu }, nil } -func (m *MockVolumeManager) UnmountVolume(_ context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions) error { +func (m *MockVolumeManager) UnmountVolume(_ context.Context, volNS, volID, remoteID, allocID string, usageOpts *UsageOptions) error { if m.CallCounter != nil { m.CallCounter.Inc("UnmountVolume") } @@ -94,7 +94,7 @@ func (m *MockVolumeManager) UnmountVolume(_ context.Context, volID, remoteID, al } // "unmount" it - delete(m.Mounts, m.mountName(volID, allocID, usageOpts)) + delete(m.Mounts, m.mountName(volNS, volID, allocID, usageOpts)) return nil } @@ -108,17 +108,17 @@ func (m *MockVolumeManager) HasMount(_ context.Context, mountInfo *MountInfo) (b return m.Mounts[mountInfo.Source], nil } -func (m *MockVolumeManager) ExpandVolume(_ context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions, capacity *csi.CapacityRange) (int64, error) { +func (m *MockVolumeManager) ExpandVolume(_ context.Context, volNS, volID, remoteID, allocID string, usageOpts *UsageOptions, capacity *csi.CapacityRange) (int64, error) { m.LastExpandVolumeCall = &MockExpandVolumeCall{ - volID, remoteID, allocID, usageOpts, capacity, + volNS, volID, remoteID, allocID, usageOpts, capacity, } return capacity.RequiredBytes, m.NextExpandVolumeErr } type MockExpandVolumeCall struct { - VolID, RemoteID, AllocID string - UsageOpts *UsageOptions - Capacity *csi.CapacityRange + VolNS, VolID, RemoteID, AllocID string + UsageOpts *UsageOptions + Capacity *csi.CapacityRange } func (m *MockVolumeManager) ExternalID() string { diff --git a/client/pluginmanager/csimanager/usage_tracker.go b/client/pluginmanager/csimanager/usage_tracker.go index 1fd1f2b2a..dbfe76808 100644 --- a/client/pluginmanager/csimanager/usage_tracker.go +++ b/client/pluginmanager/csimanager/usage_tracker.go @@ -22,6 +22,7 @@ func newVolumeUsageTracker() *volumeUsageTracker { type volumeUsageKey struct { id string + ns string usageOpts UsageOptions } @@ -51,21 +52,21 @@ func (v *volumeUsageTracker) removeAlloc(key volumeUsageKey, needle string) { } } -func (v *volumeUsageTracker) Claim(allocID, volID string, usage *UsageOptions) { +func (v *volumeUsageTracker) Claim(allocID, volID, volNS string, usage *UsageOptions) { v.stateMu.Lock() defer v.stateMu.Unlock() - key := volumeUsageKey{id: volID, usageOpts: *usage} + key := volumeUsageKey{id: volID, ns: volNS, usageOpts: *usage} v.appendAlloc(key, allocID) } // Free removes the allocation from the state list for the given alloc. If the // alloc is the last allocation for the volume then it returns true. -func (v *volumeUsageTracker) Free(allocID, volID string, usage *UsageOptions) bool { +func (v *volumeUsageTracker) Free(allocID, volID, volNS string, usage *UsageOptions) bool { v.stateMu.Lock() defer v.stateMu.Unlock() - key := volumeUsageKey{id: volID, usageOpts: *usage} + key := volumeUsageKey{id: volID, ns: volNS, usageOpts: *usage} v.removeAlloc(key, allocID) allocs := v.allocsForKey(key) return len(allocs) == 0 diff --git a/client/pluginmanager/csimanager/usage_tracker_test.go b/client/pluginmanager/csimanager/usage_tracker_test.go index 36f05e26a..dda795c5b 100644 --- a/client/pluginmanager/csimanager/usage_tracker_test.go +++ b/client/pluginmanager/csimanager/usage_tracker_test.go @@ -47,16 +47,17 @@ func TestUsageTracker(t *testing.T) { tracker := newVolumeUsageTracker() volume := &structs.CSIVolume{ - ID: "foo", + ID: "foo", + Namespace: "bar", } for _, alloc := range tc.RegisterAllocs { - tracker.Claim(alloc.ID, volume.ID, &UsageOptions{}) + tracker.Claim(alloc.ID, volume.Namespace, volume.ID, &UsageOptions{}) } result := false for _, alloc := range tc.FreeAllocs { - result = tracker.Free(alloc.ID, volume.ID, &UsageOptions{}) + result = tracker.Free(alloc.ID, volume.Namespace, volume.ID, &UsageOptions{}) } require.Equal(t, tc.ExpectedResult, result, "Tracker State: %#v", tracker.state) diff --git a/client/pluginmanager/csimanager/volume.go b/client/pluginmanager/csimanager/volume.go index f243e226b..6396dfbad 100644 --- a/client/pluginmanager/csimanager/volume.go +++ b/client/pluginmanager/csimanager/volume.go @@ -7,6 +7,7 @@ import ( "context" "errors" "fmt" + "io/fs" "os" "path/filepath" "strings" @@ -73,8 +74,8 @@ func newVolumeManager(logger hclog.Logger, eventer TriggerNodeEvent, plugin csi. } } -func (v *volumeManager) stagingDirForVolume(root string, volID string, usage *UsageOptions) string { - return filepath.Join(root, StagingDirName, volID, usage.ToFS()) +func (v *volumeManager) stagingDirForVolume(root string, volNS, volID string, usage *UsageOptions) string { + return filepath.Join(root, StagingDirName, volNS, volID, usage.ToFS()) } func (v *volumeManager) allocDirForVolume(root string, volID, allocID string) string { @@ -92,22 +93,22 @@ func (v *volumeManager) targetForVolume(root string, volID, allocID string, usag // Returns whether the directory is a pre-existing mountpoint, the staging path, // and any errors that occurred. func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume, usage *UsageOptions) (string, bool, error) { - stagingPath := v.stagingDirForVolume(v.mountRoot, vol.ID, usage) + hostStagingPath := v.stagingDirForVolume(v.mountRoot, vol.Namespace, vol.ID, usage) // Make the staging path, owned by the Nomad User - if err := os.MkdirAll(stagingPath, 0700); err != nil && !os.IsExist(err) { + if err := os.MkdirAll(hostStagingPath, 0700); err != nil && !os.IsExist(err) { return "", false, fmt.Errorf("failed to create staging directory for volume (%s): %v", vol.ID, err) } // Validate that it is not already a mount point m := mount.New() - isNotMount, err := m.IsNotAMountPoint(stagingPath) + isNotMount, err := m.IsNotAMountPoint(hostStagingPath) if err != nil { return "", false, fmt.Errorf("mount point detection failed for volume (%s): %v", vol.ID, err) } - return stagingPath, !isNotMount, nil + return hostStagingPath, !isNotMount, nil } // ensureAllocDir attempts to create a directory for use when publishing a volume @@ -167,7 +168,7 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume, if err != nil { return err } - pluginStagingPath := v.stagingDirForVolume(v.containerMountPoint, vol.ID, usage) + pluginStagingPath := v.stagingDirForVolume(v.containerMountPoint, vol.Namespace, vol.ID, usage) logger.Trace("Volume staging environment", "pre-existing_mount", isMount, "host_staging_path", hostStagingPath, "plugin_staging_path", pluginStagingPath) @@ -203,7 +204,7 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum logger := hclog.FromContext(ctx) var pluginStagingPath string if v.requiresStaging { - pluginStagingPath = v.stagingDirForVolume(v.containerMountPoint, vol.ID, usage) + pluginStagingPath = v.stagingDirForVolume(v.containerMountPoint, vol.Namespace, vol.ID, usage) } hostTargetPath, isMount, err := v.ensureAllocDir(vol, alloc, usage) @@ -261,7 +262,7 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, } if err == nil { - v.usageTracker.Claim(alloc.ID, vol.ID, usage) + v.usageTracker.Claim(alloc.ID, vol.ID, vol.Namespace, usage) } event := structs.NewNodeEvent(). @@ -284,10 +285,26 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, // once for each staging path that a volume has been staged under. // It is safe to call multiple times and a plugin is required to return OK if // the volume has been unstaged or was never staged on the node. -func (v *volumeManager) unstageVolume(ctx context.Context, volID, remoteID string, usage *UsageOptions) error { +func (v *volumeManager) unstageVolume(ctx context.Context, volNS, volID, remoteID string, usage *UsageOptions) error { logger := hclog.FromContext(ctx) - logger.Trace("Unstaging volume") - stagingPath := v.stagingDirForVolume(v.containerMountPoint, volID, usage) + + // This is the staging path inside the container, which we pass to the + // plugin to perform unstaging + stagingPath := v.stagingDirForVolume(v.containerMountPoint, volNS, volID, usage) + + // This it the path from the host, which we need to use to verify whether + // the path is the right one to pass to the plugin container + hostStagingPath := v.stagingDirForVolume(v.mountRoot, volNS, volID, usage) + _, err := os.Stat(hostStagingPath) + if err != nil && errors.Is(err, fs.ErrNotExist) { + // COMPAT: it's possible to get an unmount request that includes the + // namespace even for volumes that were mounted before the path included + // the namespace, so if the staging path doesn't exist, try the older + // path + stagingPath = v.stagingDirForVolume(v.containerMountPoint, "", volID, usage) + } + + logger.Trace("unstaging volume", "staging_path", stagingPath) // CSI NodeUnstageVolume errors for timeout, codes.Unavailable and // codes.ResourceExhausted are retried; all other errors are fatal. @@ -316,6 +333,9 @@ func combineErrors(maybeErrs ...error) error { func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) error { pluginTargetPath := v.targetForVolume(v.containerMountPoint, volID, allocID, usage) + logger := hclog.FromContext(ctx) + logger.Trace("unpublishing volume", "plugin_target_path", pluginTargetPath) + // CSI NodeUnpublishVolume errors for timeout, codes.Unavailable and // codes.ResourceExhausted are retried; all other errors are fatal. rpcErr := v.plugin.NodeUnpublishVolume(ctx, remoteID, pluginTargetPath, @@ -335,6 +355,8 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, al return rpcErr } + logger.Trace("removing host path", "host_target_path", hostTargetPath) + // Host Target Path was not cleaned up, attempt to do so here. If it's still // a mount then removing the dir will fail and we'll return any rpcErr and the // file error. @@ -349,16 +371,17 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, al return fmt.Errorf("%w: %v", structs.ErrCSIClientRPCIgnorable, rpcErr) } -func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) (err error) { - logger := v.logger.With("volume_id", volID, "alloc_id", allocID) +func (v *volumeManager) UnmountVolume(ctx context.Context, volNS, volID, remoteID, allocID string, usage *UsageOptions) (err error) { + logger := v.logger.With("volume_id", volID, "ns", volNS, "alloc_id", allocID) ctx = hclog.WithContext(ctx, logger) + logger.Trace("unmounting volume") err = v.unpublishVolume(ctx, volID, remoteID, allocID, usage) if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) { - canRelease := v.usageTracker.Free(allocID, volID, usage) + canRelease := v.usageTracker.Free(allocID, volID, volNS, usage) if v.requiresStaging && canRelease { - err = v.unstageVolume(ctx, volID, remoteID, usage) + err = v.unstageVolume(ctx, volNS, volID, remoteID, usage) } } @@ -384,7 +407,7 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo } // ExpandVolume sends a NodeExpandVolume request to the node plugin -func (v *volumeManager) ExpandVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions, capacity *csi.CapacityRange) (newCapacity int64, err error) { +func (v *volumeManager) ExpandVolume(ctx context.Context, volNS, volID, remoteID, allocID string, usage *UsageOptions, capacity *csi.CapacityRange) (newCapacity int64, err error) { capability, err := csi.VolumeCapabilityFromStructs(usage.AttachmentMode, usage.AccessMode, usage.MountOptions) if err != nil { // nil may be acceptable, so let the node plugin decide. @@ -392,12 +415,22 @@ func (v *volumeManager) ExpandVolume(ctx context.Context, volID, remoteID, alloc "volume_id", volID, "alloc_id", allocID, "error", err) } + stagingPath := v.stagingDirForVolume(v.containerMountPoint, volNS, volID, usage) + _, err = os.Stat(stagingPath) + if err != nil && errors.Is(err, fs.ErrNotExist) { + // COMPAT: it's possible to get an unmount request that includes the + // namespace even for volumes that were mounted before the path included + // the namespace, so if the staging path doesn't exist, try the older + // path + stagingPath = v.stagingDirForVolume(v.containerMountPoint, "", volID, usage) + } + req := &csi.NodeExpandVolumeRequest{ ExternalVolumeID: remoteID, CapacityRange: capacity, Capability: capability, TargetPath: v.targetForVolume(v.containerMountPoint, volID, allocID, usage), - StagingPath: v.stagingDirForVolume(v.containerMountPoint, volID, usage), + StagingPath: stagingPath, } resp, err := v.plugin.NodeExpandVolume(ctx, req, grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout), diff --git a/client/pluginmanager/csimanager/volume_test.go b/client/pluginmanager/csimanager/volume_test.go index fc10d4f73..1b8fd4a69 100644 --- a/client/pluginmanager/csimanager/volume_test.go +++ b/client/pluginmanager/csimanager/volume_test.go @@ -94,7 +94,8 @@ func TestVolumeManager_ensureStagingDir(t *testing.T) { eventer := func(e *structs.NodeEvent) {} manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake, tmpPath, tmpPath, true, "i-example") - expectedStagingPath := manager.stagingDirForVolume(tmpPath, tc.Volume.ID, tc.UsageOptions) + expectedStagingPath := manager.stagingDirForVolume(tmpPath, + tc.Volume.Namespace, tc.Volume.ID, tc.UsageOptions) if tc.CreateDirAheadOfTime { err := os.MkdirAll(expectedStagingPath, 0700) @@ -258,6 +259,7 @@ func TestVolumeManager_unstageVolume(t *testing.T) { ctx := context.Background() err := manager.unstageVolume(ctx, + tc.Volume.Namespace, tc.Volume.ID, tc.Volume.RemoteID(), tc.UsageOptions) if tc.ExpectedErr != nil { @@ -514,7 +516,7 @@ func TestVolumeManager_MountVolumeEvents(t *testing.T) { require.Equal(t, "true", e.Details["success"]) events = events[1:] - err = manager.UnmountVolume(ctx, vol.ID, vol.RemoteID(), alloc.ID, usage) + err = manager.UnmountVolume(ctx, vol.Namespace, vol.ID, vol.RemoteID(), alloc.ID, usage) require.NoError(t, err) require.Equal(t, 1, len(events)) diff --git a/client/structs/csi.go b/client/structs/csi.go index 86f2812cb..4078e0ac9 100644 --- a/client/structs/csi.go +++ b/client/structs/csi.go @@ -440,11 +440,12 @@ type ClientCSIControllerListSnapshotsResponse struct { // a Nomad client to tell a CSI node plugin on that client to perform // NodeUnpublish and NodeUnstage. type ClientCSINodeDetachVolumeRequest struct { - PluginID string // ID of the plugin that manages the volume (required) - VolumeID string // ID of the volume to be unpublished (required) - AllocID string // ID of the allocation we're unpublishing for (required) - NodeID string // ID of the Nomad client targeted - ExternalID string // External ID of the volume to be unpublished (required) + PluginID string // ID of the plugin that manages the volume (required) + VolumeID string // ID of the volume to be unpublished (required) + VolumeNamespace string // Namespace of the volume to be unpublished (required) + AllocID string // ID of the allocation we're unpublishing for (required) + NodeID string // ID of the Nomad client targeted + ExternalID string // External ID of the volume to be unpublished (required) // These fields should match the original volume request so that // we can find the mount points on the client @@ -459,9 +460,10 @@ type ClientCSINodeDetachVolumeResponse struct{} // a Nomad client to tell a CSI node plugin on that client to perform // NodeExpandVolume. type ClientCSINodeExpandVolumeRequest struct { - PluginID string // ID of the plugin that manages the volume (required) - VolumeID string // ID of the volume to be expanded (required) - ExternalID string // External ID of the volume to be expanded (required) + PluginID string // ID of the plugin that manages the volume (required) + VolumeID string // ID of the volume to be expanded (required) + VolumeNamespace string // Namespace of the volume to be expanded (required) + ExternalID string // External ID of the volume to be expanded (required) // Capacity range (required) to be sent to the node plugin Capacity *csi.CapacityRange diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index c51d2a312..afa7aaa0c 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -845,14 +845,15 @@ func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *struc } req := &cstructs.ClientCSINodeDetachVolumeRequest{ - PluginID: vol.PluginID, - VolumeID: vol.ID, - ExternalID: vol.RemoteID(), - AllocID: claim.AllocationID, - NodeID: claim.NodeID, - AttachmentMode: claim.AttachmentMode, - AccessMode: claim.AccessMode, - ReadOnly: claim.Mode == structs.CSIVolumeClaimRead, + PluginID: vol.PluginID, + VolumeID: vol.ID, + VolumeNamespace: vol.Namespace, + ExternalID: vol.RemoteID(), + AllocID: claim.AllocationID, + NodeID: claim.NodeID, + AttachmentMode: claim.AttachmentMode, + AccessMode: claim.AccessMode, + ReadOnly: claim.Mode == structs.CSIVolumeClaimRead, } err := v.srv.RPC("ClientCSI.NodeDetachVolume", req, &cstructs.ClientCSINodeDetachVolumeResponse{}) @@ -1295,11 +1296,12 @@ func (v *CSIVolume) nodeExpandVolume(vol *structs.CSIVolume, plugin *structs.CSI resp := &cstructs.ClientCSINodeExpandVolumeResponse{} req := &cstructs.ClientCSINodeExpandVolumeRequest{ - PluginID: plugin.ID, - VolumeID: vol.ID, - ExternalID: vol.ExternalID, - Capacity: capacity, - Claim: claim, + PluginID: plugin.ID, + VolumeID: vol.ID, + VolumeNamespace: vol.Namespace, + ExternalID: vol.ExternalID, + Capacity: capacity, + Claim: claim, } if err := v.srv.RPC("ClientCSI.NodeExpandVolume", req, resp); err != nil { mErr.Errors = append(mErr.Errors, err) diff --git a/testutil/mock_calls.go b/testutil/mock_calls.go index 5b37832bc..37fa7c670 100644 --- a/testutil/mock_calls.go +++ b/testutil/mock_calls.go @@ -33,6 +33,12 @@ func (c *CallCounter) Get() map[string]int { return maps.Clone(c.counts) } +func (c *CallCounter) Reset() { + c.lock.Lock() + defer c.lock.Unlock() + c.counts = make(map[string]int) +} + func (c *CallCounter) AssertCalled(t testing.T, name string) { t.Helper() counts := c.Get()