CSI: include volume namespace in staging path (#20532)

CSI volumes are namespaced. But the client does not include the namespace in the
staging mount path. This causes CSI volumes with the same volume ID but
different namespace to collide if they happen to be placed on the same host. The
per-allocation paths don't need to be namespaced, because an allocation can only
mount volumes from its job's own namespace.

Rework the CSI hook tests to have more fine-grained control over the mock
on-disk state. Add tests covering upgrades from staging paths missing
namespaces.

Fixes: https://github.com/hashicorp/nomad/issues/18741
This commit is contained in:
Tim Gross
2024-05-13 11:24:09 -04:00
committed by GitHub
parent 623486b302
commit 65ae61249c
14 changed files with 318 additions and 152 deletions

3
.changelog/20532.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
csi: Fixed a bug where volumes in different namespaces but the same ID would fail to stage on the same client
```

View File

@@ -323,6 +323,7 @@ func (c *csiHook) claimVolumes(results map[string]*volumePublishResult) error {
// populate data we'll write later to disk
result.stub.VolumeID = resp.Volume.ID
result.stub.VolumeNamespace = resp.Volume.Namespace
result.stub.VolumeExternalID = resp.Volume.RemoteID()
result.stub.PluginID = resp.Volume.PluginID
result.publishContext = resp.PublishContext
@@ -532,7 +533,8 @@ func (c *csiHook) unmountImpl(result *volumePublishResult) error {
}
return manager.UnmountVolume(c.shutdownCtx,
result.stub.VolumeID, result.stub.VolumeExternalID, c.alloc.ID, usageOpts)
result.stub.VolumeNamespace, result.stub.VolumeID,
result.stub.VolumeExternalID, c.alloc.ID, usageOpts)
}
// Shutdown will get called when the client is gracefully

View File

@@ -4,6 +4,7 @@
package allocrunner
import (
"context"
"errors"
"fmt"
"testing"
@@ -21,7 +22,6 @@ import (
"github.com/hashicorp/nomad/plugins/drivers"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
var _ interfaces.RunnerPrerunHook = (*csiHook)(nil)
@@ -31,30 +31,41 @@ func TestCSIHook(t *testing.T) {
ci.Parallel(t)
alloc := mock.Alloc()
volID := "volID0"
volName := "volName"
pluginID := "plugin_id"
// expected by most of the tests
testMountSrc := fmt.Sprintf(
"test-alloc-dir/%s/testvolume0/ro-file-system-single-node-reader-only", alloc.ID)
logger := testlog.HCLogger(t)
"test-alloc-dir/%s/ns/%s/ro-file-system-single-node-reader-only", alloc.ID, volID)
testcases := []struct {
name string
volumeRequests map[string]*structs.VolumeRequest
startsUnschedulable bool
startsWithClaims bool
startsWithStubs map[string]*state.CSIVolumeStub
startsWithValidMounts bool
failsFirstUnmount bool
expectedClaimErr error
expectedMounts map[string]*csimanager.MountInfo
expectedCalls map[string]int
name string
volumeRequests map[string]*structs.VolumeRequest
rpcNS string // namespace of volume, as returned by server in Claim
startsUnschedulable bool // claim will fail
startsWithClaims bool // claim exists on server
startsWithStubs bool // mount info is written to client state
startsWithValidMounts bool // mounts were created
startingStub *state.CSIVolumeStub // mount info used in starting mounts/stubs
startingVolumeNS string // namespace of volume previously mounted
failsFirstUnmount bool
expectedClaimErr error
expectedMounts map[string]*csimanager.MountInfo
expectedCalls map[string]int
}{
{
name: "simple case",
name: "simple case",
rpcNS: "ns",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
volName: {
Name: volName,
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
Source: volID,
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
@@ -63,19 +74,20 @@ func TestCSIHook(t *testing.T) {
},
},
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: testMountSrc},
volName: &csimanager.MountInfo{Source: testMountSrc},
},
expectedCalls: map[string]int{
"claim": 1, "MountVolume": 1, "UnmountVolume": 1, "unpublish": 1},
},
{
name: "per-alloc case",
name: "per-alloc case",
rpcNS: "ns",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
volName: {
Name: volName,
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
Source: volID,
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
@@ -84,19 +96,20 @@ func TestCSIHook(t *testing.T) {
},
},
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: testMountSrc},
volName: &csimanager.MountInfo{Source: testMountSrc},
},
expectedCalls: map[string]int{
"claim": 1, "MountVolume": 1, "UnmountVolume": 1, "unpublish": 1},
},
{
name: "fatal error on claim",
name: "fatal error on claim",
rpcNS: "ns",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
volName: {
Name: volName,
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
Source: volID,
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
@@ -105,21 +118,19 @@ func TestCSIHook(t *testing.T) {
},
},
startsUnschedulable: true,
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: testMountSrc},
},
expectedCalls: map[string]int{"claim": 1},
expectedCalls: map[string]int{"claim": 1, "UnmountVolume": 1, "unpublish": 1},
expectedClaimErr: errors.New(
"claiming volumes: could not claim volume testvolume0: volume is currently unschedulable"),
"claiming volumes: could not claim volume volID0: volume is currently unschedulable"),
},
{
name: "retryable error on claim",
name: "retryable error on claim",
rpcNS: "ns",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
volName: {
Name: volName,
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
Source: volID,
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
@@ -129,18 +140,20 @@ func TestCSIHook(t *testing.T) {
},
startsWithClaims: true,
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: testMountSrc},
volName: &csimanager.MountInfo{Source: testMountSrc},
},
expectedCalls: map[string]int{
"claim": 2, "MountVolume": 1, "UnmountVolume": 1, "unpublish": 1},
},
{
name: "already mounted",
name: "already mounted",
rpcNS: "ns",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
volName: {
Name: volName,
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
Source: volID,
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
@@ -148,25 +161,30 @@ func TestCSIHook(t *testing.T) {
PerAlloc: false,
},
},
startsWithStubs: map[string]*state.CSIVolumeStub{"vol0": {
VolumeID: "vol0",
PluginID: "vol0-plugin",
ExternalNodeID: "i-example",
MountInfo: &csimanager.MountInfo{Source: testMountSrc},
}},
startsWithStubs: true,
startsWithValidMounts: true,
startingVolumeNS: "ns",
startingStub: &state.CSIVolumeStub{
VolumeID: volID,
VolumeNamespace: "ns",
PluginID: pluginID,
ExternalNodeID: "i-example",
MountInfo: &csimanager.MountInfo{Source: testMountSrc},
},
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: testMountSrc},
volName: &csimanager.MountInfo{Source: testMountSrc},
},
expectedCalls: map[string]int{"HasMount": 1, "UnmountVolume": 1, "unpublish": 1},
},
{
name: "existing but invalid mounts",
name: "existing but invalid mounts",
rpcNS: "ns",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
volName: {
Name: volName,
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
Source: volID,
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
@@ -174,27 +192,33 @@ func TestCSIHook(t *testing.T) {
PerAlloc: false,
},
},
startsWithStubs: map[string]*state.CSIVolumeStub{"vol0": {
VolumeID: "testvolume0",
PluginID: "vol0-plugin",
ExternalNodeID: "i-example",
MountInfo: &csimanager.MountInfo{Source: testMountSrc},
}},
// same as case above, but the stub only exists in the client state
// db and not actually on-disk (ex. after host reboot)
startsWithStubs: true,
startsWithValidMounts: false,
startingVolumeNS: "ns",
startingStub: &state.CSIVolumeStub{
VolumeID: volID,
VolumeNamespace: "ns",
PluginID: pluginID,
ExternalNodeID: "i-example",
MountInfo: &csimanager.MountInfo{Source: testMountSrc},
},
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: testMountSrc},
volName: &csimanager.MountInfo{Source: testMountSrc},
},
expectedCalls: map[string]int{
"HasMount": 1, "claim": 1, "MountVolume": 1, "UnmountVolume": 1, "unpublish": 1},
},
{
name: "retry on failed unmount",
name: "retry on failed unmount",
rpcNS: "ns",
volumeRequests: map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
volName: {
Name: volName,
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
Source: volID,
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
@@ -204,12 +228,78 @@ func TestCSIHook(t *testing.T) {
},
failsFirstUnmount: true,
expectedMounts: map[string]*csimanager.MountInfo{
"vol0": &csimanager.MountInfo{Source: testMountSrc},
volName: &csimanager.MountInfo{Source: testMountSrc},
},
expectedCalls: map[string]int{
"claim": 1, "MountVolume": 1, "UnmountVolume": 2, "unpublish": 2},
},
{
name: "client upgrade from version with missing namespace",
rpcNS: "",
volumeRequests: map[string]*structs.VolumeRequest{
volName: {
Name: volName,
Type: structs.VolumeTypeCSI,
Source: volID,
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
MountOptions: &structs.CSIMountOptions{},
PerAlloc: false,
},
},
startsWithStubs: true,
startsWithValidMounts: true,
startingVolumeNS: "", // note: existing mount has no namespace
startingStub: &state.CSIVolumeStub{
VolumeID: volID,
VolumeNamespace: "",
PluginID: pluginID,
ExternalNodeID: "i-example",
MountInfo: &csimanager.MountInfo{Source: fmt.Sprintf(
"test-alloc-dir/%s/volID0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMounts: map[string]*csimanager.MountInfo{
volName: &csimanager.MountInfo{Source: fmt.Sprintf(
"test-alloc-dir/%s/volID0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedCalls: map[string]int{"HasMount": 1, "UnmountVolume": 1, "unpublish": 1},
},
{
name: "server upgrade from version with missing namespace",
rpcNS: "ns",
volumeRequests: map[string]*structs.VolumeRequest{
volName: {
Name: volName,
Type: structs.VolumeTypeCSI,
Source: volID,
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
MountOptions: &structs.CSIMountOptions{},
PerAlloc: false,
},
},
startsWithStubs: true,
startsWithValidMounts: true,
startingVolumeNS: "", // note: existing mount has no namespace
startingStub: &state.CSIVolumeStub{
VolumeID: volName,
VolumeNamespace: "",
PluginID: pluginID,
ExternalNodeID: "i-example",
MountInfo: &csimanager.MountInfo{Source: fmt.Sprintf(
"test-alloc-dir/%s/volID0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedMounts: map[string]*csimanager.MountInfo{
volName: &csimanager.MountInfo{Source: fmt.Sprintf(
"test-alloc-dir/%s/volID0/ro-file-system-single-node-reader-only", alloc.ID)},
},
expectedCalls: map[string]int{"HasMount": 1, "UnmountVolume": 1, "unpublish": 1},
},
{
name: "should not run",
volumeRequests: map[string]*structs.VolumeRequest{},
@@ -229,6 +319,7 @@ func TestCSIHook(t *testing.T) {
mgr := &csimanager.MockCSIManager{VM: vm}
rpcer := mockRPCer{
alloc: alloc,
ns: tc.rpcNS,
callCounts: callCounts,
hasExistingClaim: pointer.Of(tc.startsWithClaims),
schedulable: pointer.Of(!tc.startsUnschedulable),
@@ -239,9 +330,10 @@ func TestCSIHook(t *testing.T) {
FSIsolation: drivers.FSIsolationChroot,
MountConfigs: drivers.MountConfigSupportAll,
},
stubs: tc.startsWithStubs,
stubs: make(map[string]*state.CSIVolumeStub),
}
logger := testlog.HCLogger(t)
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar.res, "secret")
hook.minBackoffInterval = 1 * time.Millisecond
hook.maxBackoffInterval = 10 * time.Millisecond
@@ -249,29 +341,45 @@ func TestCSIHook(t *testing.T) {
must.NotNil(t, hook)
if tc.startsWithStubs {
// write a fake mount stub to the "client state"
ar.stubs[volName] = tc.startingStub
ar.SetCSIVolumes(map[string]*state.CSIVolumeStub{volName: tc.startingStub})
}
if tc.startsWithValidMounts {
// TODO: this works, but it requires knowledge of how the mock works. would rather vm.MountVolume()
vm.Mounts = map[string]bool{
tc.expectedMounts["vol0"].Source: true,
}
// create a fake mount
req := tc.volumeRequests[volName]
vol := rpcer.testVolume(req.Source, tc.startingVolumeNS)
_, err := vm.MountVolume(context.TODO(), vol, alloc,
&csimanager.UsageOptions{
ReadOnly: req.ReadOnly,
AttachmentMode: req.AttachmentMode,
AccessMode: req.AccessMode,
}, nil)
must.NoError(t, err)
vm.CallCounter.Reset()
}
if tc.failsFirstUnmount {
vm.NextUnmountVolumeErr = errors.New("bad first attempt")
}
err := hook.Prerun()
if tc.expectedClaimErr != nil {
must.EqError(t, hook.Prerun(), tc.expectedClaimErr.Error())
mounts := ar.res.GetCSIMounts()
must.Nil(t, mounts)
must.EqError(t, err, tc.expectedClaimErr.Error())
} else {
must.NoError(t, hook.Prerun())
mounts := ar.res.GetCSIMounts()
must.MapEq(t, tc.expectedMounts, mounts,
must.Sprintf("got mounts: %v", mounts))
must.NoError(t, hook.Postrun())
must.NoError(t, err)
}
mounts := ar.res.GetCSIMounts()
must.MapEq(t, tc.expectedMounts, mounts,
must.Sprintf("got mounts: %v", mounts))
// even if we failed to mount in the first place, we should get no
// errors from Postrun
must.NoError(t, hook.Postrun())
if tc.failsFirstUnmount {
// retrying the unmount doesn't block Postrun, so give it time
// to run once more before checking the call counts to ensure
@@ -281,7 +389,7 @@ func TestCSIHook(t *testing.T) {
counts := callCounts.Get()
must.MapEq(t, tc.expectedCalls, counts,
must.Sprintf("got calls: %v", counts))
must.Sprintf("got calls: %v\n\texpected: %v", counts, tc.expectedCalls))
})
}
@@ -293,13 +401,16 @@ func TestCSIHook(t *testing.T) {
func TestCSIHook_Prerun_Validation(t *testing.T) {
ci.Parallel(t)
volID := "volID0"
volName := "volName"
alloc := mock.Alloc()
logger := testlog.HCLogger(t)
volumeRequests := map[string]*structs.VolumeRequest{
"vol0": {
Name: "vol0",
volName: {
Name: volName,
Type: structs.VolumeTypeCSI,
Source: "testvolume0",
Source: volID,
ReadOnly: true,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
@@ -364,17 +475,17 @@ func TestCSIHook_Prerun_Validation(t *testing.T) {
}
hook := newCSIHook(alloc, logger, mgr, rpcer, ar, ar.res, "secret")
require.NotNil(t, hook)
must.NotNil(t, hook)
if tc.expectedErr != "" {
require.EqualError(t, hook.Prerun(), tc.expectedErr)
must.EqError(t, hook.Prerun(), tc.expectedErr)
mounts := ar.res.GetCSIMounts()
require.Nil(t, mounts)
must.Nil(t, mounts)
} else {
require.NoError(t, hook.Prerun())
must.NoError(t, hook.Prerun())
mounts := ar.res.GetCSIMounts()
require.NotNil(t, mounts)
require.NoError(t, hook.Postrun())
must.NotNil(t, mounts)
must.NoError(t, hook.Postrun())
}
})
}
@@ -384,6 +495,7 @@ func TestCSIHook_Prerun_Validation(t *testing.T) {
type mockRPCer struct {
alloc *structs.Allocation
ns string
callCounts *testutil.CallCounter
hasExistingClaim *bool
schedulable *bool
@@ -395,7 +507,7 @@ func (r mockRPCer) RPC(method string, args any, reply any) error {
case "CSIVolume.Claim":
r.callCounts.Inc("claim")
req := args.(*structs.CSIVolumeClaimRequest)
vol := r.testVolume(req.VolumeID)
vol := r.testVolume(req.VolumeID, r.ns)
err := vol.Claim(req.ToClaim(), r.alloc)
// after the first claim attempt is made, reset the volume's claims as
@@ -425,10 +537,11 @@ func (r mockRPCer) RPC(method string, args any, reply any) error {
// testVolume is a helper that optionally starts as unschedulable / claimed, so
// that we can test retryable vs non-retryable failures
func (r mockRPCer) testVolume(id string) *structs.CSIVolume {
func (r mockRPCer) testVolume(id, ns string) *structs.CSIVolume {
vol := structs.NewCSIVolume(id, 0)
vol.Schedulable = *r.schedulable
vol.PluginID = "plugin-" + id
vol.Namespace = ns
vol.RequestedCapabilities = []*structs.CSIVolumeCapability{
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,

View File

@@ -86,6 +86,7 @@ type AllocVolumes struct {
// relevant data that we need to persist about the volume.
type CSIVolumeStub struct {
VolumeID string
VolumeNamespace string
VolumeExternalID string
PluginID string
ExternalNodeID string

View File

@@ -527,7 +527,7 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re
AccessMode: req.AccessMode,
}
err = manager.UnmountVolume(ctx, req.VolumeID, req.ExternalID, req.AllocID, usageOpts)
err = manager.UnmountVolume(ctx, req.VolumeNamespace, req.VolumeID, req.ExternalID, req.AllocID, usageOpts)
if err != nil && !errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
// if the unmounting previously happened but the server failed to
// checkpoint, we'll get an error from Unmount but can safely
@@ -565,7 +565,7 @@ func (c *CSI) NodeExpandVolume(req *structs.ClientCSINodeExpandVolumeRequest, re
return err
}
newCapacity, err := manager.ExpandVolume(ctx,
newCapacity, err := manager.ExpandVolume(ctx, req.VolumeNamespace,
req.VolumeID, req.ExternalID, req.Claim.AllocationID, usageOpts, req.Capacity)
if err != nil && !errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {

View File

@@ -56,9 +56,9 @@ func (u *UsageOptions) ToFS() string {
type VolumeManager interface {
MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions, publishContext map[string]string) (*MountInfo, error)
UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions) error
UnmountVolume(ctx context.Context, volNS, volID, remoteID, allocID string, usageOpts *UsageOptions) error
HasMount(ctx context.Context, mountInfo *MountInfo) (bool, error)
ExpandVolume(ctx context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions, capacity *csi.CapacityRange) (int64, error)
ExpandVolume(ctx context.Context, volNS, volID, remoteID, allocID string, usageOpts *UsageOptions, capacity *csi.CapacityRange) (int64, error)
ExternalID() string
}

View File

@@ -55,8 +55,8 @@ type MockVolumeManager struct {
LastExpandVolumeCall *MockExpandVolumeCall
}
func (m *MockVolumeManager) mountName(volID, allocID string, usageOpts *UsageOptions) string {
return filepath.Join("test-alloc-dir", allocID, volID, usageOpts.ToFS())
func (m *MockVolumeManager) mountName(volNS, volID, allocID string, usageOpts *UsageOptions) string {
return filepath.Join("test-alloc-dir", allocID, volNS, volID, usageOpts.ToFS())
}
func (m *MockVolumeManager) MountVolume(_ context.Context, vol *nstructs.CSIVolume, alloc *nstructs.Allocation, usageOpts *UsageOptions, publishContext map[string]string) (*MountInfo, error) {
@@ -74,7 +74,7 @@ func (m *MockVolumeManager) MountVolume(_ context.Context, vol *nstructs.CSIVolu
if m.Mounts == nil {
m.Mounts = make(map[string]bool)
}
source := m.mountName(vol.ID, alloc.ID, usageOpts)
source := m.mountName(vol.Namespace, vol.ID, alloc.ID, usageOpts)
m.Mounts[source] = true
return &MountInfo{
@@ -82,7 +82,7 @@ func (m *MockVolumeManager) MountVolume(_ context.Context, vol *nstructs.CSIVolu
}, nil
}
func (m *MockVolumeManager) UnmountVolume(_ context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions) error {
func (m *MockVolumeManager) UnmountVolume(_ context.Context, volNS, volID, remoteID, allocID string, usageOpts *UsageOptions) error {
if m.CallCounter != nil {
m.CallCounter.Inc("UnmountVolume")
}
@@ -94,7 +94,7 @@ func (m *MockVolumeManager) UnmountVolume(_ context.Context, volID, remoteID, al
}
// "unmount" it
delete(m.Mounts, m.mountName(volID, allocID, usageOpts))
delete(m.Mounts, m.mountName(volNS, volID, allocID, usageOpts))
return nil
}
@@ -108,17 +108,17 @@ func (m *MockVolumeManager) HasMount(_ context.Context, mountInfo *MountInfo) (b
return m.Mounts[mountInfo.Source], nil
}
func (m *MockVolumeManager) ExpandVolume(_ context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions, capacity *csi.CapacityRange) (int64, error) {
func (m *MockVolumeManager) ExpandVolume(_ context.Context, volNS, volID, remoteID, allocID string, usageOpts *UsageOptions, capacity *csi.CapacityRange) (int64, error) {
m.LastExpandVolumeCall = &MockExpandVolumeCall{
volID, remoteID, allocID, usageOpts, capacity,
volNS, volID, remoteID, allocID, usageOpts, capacity,
}
return capacity.RequiredBytes, m.NextExpandVolumeErr
}
type MockExpandVolumeCall struct {
VolID, RemoteID, AllocID string
UsageOpts *UsageOptions
Capacity *csi.CapacityRange
VolNS, VolID, RemoteID, AllocID string
UsageOpts *UsageOptions
Capacity *csi.CapacityRange
}
func (m *MockVolumeManager) ExternalID() string {

View File

@@ -22,6 +22,7 @@ func newVolumeUsageTracker() *volumeUsageTracker {
type volumeUsageKey struct {
id string
ns string
usageOpts UsageOptions
}
@@ -51,21 +52,21 @@ func (v *volumeUsageTracker) removeAlloc(key volumeUsageKey, needle string) {
}
}
func (v *volumeUsageTracker) Claim(allocID, volID string, usage *UsageOptions) {
func (v *volumeUsageTracker) Claim(allocID, volID, volNS string, usage *UsageOptions) {
v.stateMu.Lock()
defer v.stateMu.Unlock()
key := volumeUsageKey{id: volID, usageOpts: *usage}
key := volumeUsageKey{id: volID, ns: volNS, usageOpts: *usage}
v.appendAlloc(key, allocID)
}
// Free removes the allocation from the state list for the given alloc. If the
// alloc is the last allocation for the volume then it returns true.
func (v *volumeUsageTracker) Free(allocID, volID string, usage *UsageOptions) bool {
func (v *volumeUsageTracker) Free(allocID, volID, volNS string, usage *UsageOptions) bool {
v.stateMu.Lock()
defer v.stateMu.Unlock()
key := volumeUsageKey{id: volID, usageOpts: *usage}
key := volumeUsageKey{id: volID, ns: volNS, usageOpts: *usage}
v.removeAlloc(key, allocID)
allocs := v.allocsForKey(key)
return len(allocs) == 0

View File

@@ -47,16 +47,17 @@ func TestUsageTracker(t *testing.T) {
tracker := newVolumeUsageTracker()
volume := &structs.CSIVolume{
ID: "foo",
ID: "foo",
Namespace: "bar",
}
for _, alloc := range tc.RegisterAllocs {
tracker.Claim(alloc.ID, volume.ID, &UsageOptions{})
tracker.Claim(alloc.ID, volume.Namespace, volume.ID, &UsageOptions{})
}
result := false
for _, alloc := range tc.FreeAllocs {
result = tracker.Free(alloc.ID, volume.ID, &UsageOptions{})
result = tracker.Free(alloc.ID, volume.Namespace, volume.ID, &UsageOptions{})
}
require.Equal(t, tc.ExpectedResult, result, "Tracker State: %#v", tracker.state)

View File

@@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"strings"
@@ -73,8 +74,8 @@ func newVolumeManager(logger hclog.Logger, eventer TriggerNodeEvent, plugin csi.
}
}
func (v *volumeManager) stagingDirForVolume(root string, volID string, usage *UsageOptions) string {
return filepath.Join(root, StagingDirName, volID, usage.ToFS())
func (v *volumeManager) stagingDirForVolume(root string, volNS, volID string, usage *UsageOptions) string {
return filepath.Join(root, StagingDirName, volNS, volID, usage.ToFS())
}
func (v *volumeManager) allocDirForVolume(root string, volID, allocID string) string {
@@ -92,22 +93,22 @@ func (v *volumeManager) targetForVolume(root string, volID, allocID string, usag
// Returns whether the directory is a pre-existing mountpoint, the staging path,
// and any errors that occurred.
func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume, usage *UsageOptions) (string, bool, error) {
stagingPath := v.stagingDirForVolume(v.mountRoot, vol.ID, usage)
hostStagingPath := v.stagingDirForVolume(v.mountRoot, vol.Namespace, vol.ID, usage)
// Make the staging path, owned by the Nomad User
if err := os.MkdirAll(stagingPath, 0700); err != nil && !os.IsExist(err) {
if err := os.MkdirAll(hostStagingPath, 0700); err != nil && !os.IsExist(err) {
return "", false, fmt.Errorf("failed to create staging directory for volume (%s): %v", vol.ID, err)
}
// Validate that it is not already a mount point
m := mount.New()
isNotMount, err := m.IsNotAMountPoint(stagingPath)
isNotMount, err := m.IsNotAMountPoint(hostStagingPath)
if err != nil {
return "", false, fmt.Errorf("mount point detection failed for volume (%s): %v", vol.ID, err)
}
return stagingPath, !isNotMount, nil
return hostStagingPath, !isNotMount, nil
}
// ensureAllocDir attempts to create a directory for use when publishing a volume
@@ -167,7 +168,7 @@ func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume,
if err != nil {
return err
}
pluginStagingPath := v.stagingDirForVolume(v.containerMountPoint, vol.ID, usage)
pluginStagingPath := v.stagingDirForVolume(v.containerMountPoint, vol.Namespace, vol.ID, usage)
logger.Trace("Volume staging environment", "pre-existing_mount", isMount, "host_staging_path", hostStagingPath, "plugin_staging_path", pluginStagingPath)
@@ -203,7 +204,7 @@ func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolum
logger := hclog.FromContext(ctx)
var pluginStagingPath string
if v.requiresStaging {
pluginStagingPath = v.stagingDirForVolume(v.containerMountPoint, vol.ID, usage)
pluginStagingPath = v.stagingDirForVolume(v.containerMountPoint, vol.Namespace, vol.ID, usage)
}
hostTargetPath, isMount, err := v.ensureAllocDir(vol, alloc, usage)
@@ -261,7 +262,7 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume,
}
if err == nil {
v.usageTracker.Claim(alloc.ID, vol.ID, usage)
v.usageTracker.Claim(alloc.ID, vol.ID, vol.Namespace, usage)
}
event := structs.NewNodeEvent().
@@ -284,10 +285,26 @@ func (v *volumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume,
// once for each staging path that a volume has been staged under.
// It is safe to call multiple times and a plugin is required to return OK if
// the volume has been unstaged or was never staged on the node.
func (v *volumeManager) unstageVolume(ctx context.Context, volID, remoteID string, usage *UsageOptions) error {
func (v *volumeManager) unstageVolume(ctx context.Context, volNS, volID, remoteID string, usage *UsageOptions) error {
logger := hclog.FromContext(ctx)
logger.Trace("Unstaging volume")
stagingPath := v.stagingDirForVolume(v.containerMountPoint, volID, usage)
// This is the staging path inside the container, which we pass to the
// plugin to perform unstaging
stagingPath := v.stagingDirForVolume(v.containerMountPoint, volNS, volID, usage)
// This it the path from the host, which we need to use to verify whether
// the path is the right one to pass to the plugin container
hostStagingPath := v.stagingDirForVolume(v.mountRoot, volNS, volID, usage)
_, err := os.Stat(hostStagingPath)
if err != nil && errors.Is(err, fs.ErrNotExist) {
// COMPAT: it's possible to get an unmount request that includes the
// namespace even for volumes that were mounted before the path included
// the namespace, so if the staging path doesn't exist, try the older
// path
stagingPath = v.stagingDirForVolume(v.containerMountPoint, "", volID, usage)
}
logger.Trace("unstaging volume", "staging_path", stagingPath)
// CSI NodeUnstageVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
@@ -316,6 +333,9 @@ func combineErrors(maybeErrs ...error) error {
func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) error {
pluginTargetPath := v.targetForVolume(v.containerMountPoint, volID, allocID, usage)
logger := hclog.FromContext(ctx)
logger.Trace("unpublishing volume", "plugin_target_path", pluginTargetPath)
// CSI NodeUnpublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
rpcErr := v.plugin.NodeUnpublishVolume(ctx, remoteID, pluginTargetPath,
@@ -335,6 +355,8 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, al
return rpcErr
}
logger.Trace("removing host path", "host_target_path", hostTargetPath)
// Host Target Path was not cleaned up, attempt to do so here. If it's still
// a mount then removing the dir will fail and we'll return any rpcErr and the
// file error.
@@ -349,16 +371,17 @@ func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, al
return fmt.Errorf("%w: %v", structs.ErrCSIClientRPCIgnorable, rpcErr)
}
func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) (err error) {
logger := v.logger.With("volume_id", volID, "alloc_id", allocID)
func (v *volumeManager) UnmountVolume(ctx context.Context, volNS, volID, remoteID, allocID string, usage *UsageOptions) (err error) {
logger := v.logger.With("volume_id", volID, "ns", volNS, "alloc_id", allocID)
ctx = hclog.WithContext(ctx, logger)
logger.Trace("unmounting volume")
err = v.unpublishVolume(ctx, volID, remoteID, allocID, usage)
if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
canRelease := v.usageTracker.Free(allocID, volID, usage)
canRelease := v.usageTracker.Free(allocID, volID, volNS, usage)
if v.requiresStaging && canRelease {
err = v.unstageVolume(ctx, volID, remoteID, usage)
err = v.unstageVolume(ctx, volNS, volID, remoteID, usage)
}
}
@@ -384,7 +407,7 @@ func (v *volumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allo
}
// ExpandVolume sends a NodeExpandVolume request to the node plugin
func (v *volumeManager) ExpandVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions, capacity *csi.CapacityRange) (newCapacity int64, err error) {
func (v *volumeManager) ExpandVolume(ctx context.Context, volNS, volID, remoteID, allocID string, usage *UsageOptions, capacity *csi.CapacityRange) (newCapacity int64, err error) {
capability, err := csi.VolumeCapabilityFromStructs(usage.AttachmentMode, usage.AccessMode, usage.MountOptions)
if err != nil {
// nil may be acceptable, so let the node plugin decide.
@@ -392,12 +415,22 @@ func (v *volumeManager) ExpandVolume(ctx context.Context, volID, remoteID, alloc
"volume_id", volID, "alloc_id", allocID, "error", err)
}
stagingPath := v.stagingDirForVolume(v.containerMountPoint, volNS, volID, usage)
_, err = os.Stat(stagingPath)
if err != nil && errors.Is(err, fs.ErrNotExist) {
// COMPAT: it's possible to get an unmount request that includes the
// namespace even for volumes that were mounted before the path included
// the namespace, so if the staging path doesn't exist, try the older
// path
stagingPath = v.stagingDirForVolume(v.containerMountPoint, "", volID, usage)
}
req := &csi.NodeExpandVolumeRequest{
ExternalVolumeID: remoteID,
CapacityRange: capacity,
Capability: capability,
TargetPath: v.targetForVolume(v.containerMountPoint, volID, allocID, usage),
StagingPath: v.stagingDirForVolume(v.containerMountPoint, volID, usage),
StagingPath: stagingPath,
}
resp, err := v.plugin.NodeExpandVolume(ctx, req,
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),

View File

@@ -94,7 +94,8 @@ func TestVolumeManager_ensureStagingDir(t *testing.T) {
eventer := func(e *structs.NodeEvent) {}
manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake,
tmpPath, tmpPath, true, "i-example")
expectedStagingPath := manager.stagingDirForVolume(tmpPath, tc.Volume.ID, tc.UsageOptions)
expectedStagingPath := manager.stagingDirForVolume(tmpPath,
tc.Volume.Namespace, tc.Volume.ID, tc.UsageOptions)
if tc.CreateDirAheadOfTime {
err := os.MkdirAll(expectedStagingPath, 0700)
@@ -258,6 +259,7 @@ func TestVolumeManager_unstageVolume(t *testing.T) {
ctx := context.Background()
err := manager.unstageVolume(ctx,
tc.Volume.Namespace,
tc.Volume.ID, tc.Volume.RemoteID(), tc.UsageOptions)
if tc.ExpectedErr != nil {
@@ -514,7 +516,7 @@ func TestVolumeManager_MountVolumeEvents(t *testing.T) {
require.Equal(t, "true", e.Details["success"])
events = events[1:]
err = manager.UnmountVolume(ctx, vol.ID, vol.RemoteID(), alloc.ID, usage)
err = manager.UnmountVolume(ctx, vol.Namespace, vol.ID, vol.RemoteID(), alloc.ID, usage)
require.NoError(t, err)
require.Equal(t, 1, len(events))

View File

@@ -440,11 +440,12 @@ type ClientCSIControllerListSnapshotsResponse struct {
// a Nomad client to tell a CSI node plugin on that client to perform
// NodeUnpublish and NodeUnstage.
type ClientCSINodeDetachVolumeRequest struct {
PluginID string // ID of the plugin that manages the volume (required)
VolumeID string // ID of the volume to be unpublished (required)
AllocID string // ID of the allocation we're unpublishing for (required)
NodeID string // ID of the Nomad client targeted
ExternalID string // External ID of the volume to be unpublished (required)
PluginID string // ID of the plugin that manages the volume (required)
VolumeID string // ID of the volume to be unpublished (required)
VolumeNamespace string // Namespace of the volume to be unpublished (required)
AllocID string // ID of the allocation we're unpublishing for (required)
NodeID string // ID of the Nomad client targeted
ExternalID string // External ID of the volume to be unpublished (required)
// These fields should match the original volume request so that
// we can find the mount points on the client
@@ -459,9 +460,10 @@ type ClientCSINodeDetachVolumeResponse struct{}
// a Nomad client to tell a CSI node plugin on that client to perform
// NodeExpandVolume.
type ClientCSINodeExpandVolumeRequest struct {
PluginID string // ID of the plugin that manages the volume (required)
VolumeID string // ID of the volume to be expanded (required)
ExternalID string // External ID of the volume to be expanded (required)
PluginID string // ID of the plugin that manages the volume (required)
VolumeID string // ID of the volume to be expanded (required)
VolumeNamespace string // Namespace of the volume to be expanded (required)
ExternalID string // External ID of the volume to be expanded (required)
// Capacity range (required) to be sent to the node plugin
Capacity *csi.CapacityRange

View File

@@ -845,14 +845,15 @@ func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *struc
}
req := &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: vol.PluginID,
VolumeID: vol.ID,
ExternalID: vol.RemoteID(),
AllocID: claim.AllocationID,
NodeID: claim.NodeID,
AttachmentMode: claim.AttachmentMode,
AccessMode: claim.AccessMode,
ReadOnly: claim.Mode == structs.CSIVolumeClaimRead,
PluginID: vol.PluginID,
VolumeID: vol.ID,
VolumeNamespace: vol.Namespace,
ExternalID: vol.RemoteID(),
AllocID: claim.AllocationID,
NodeID: claim.NodeID,
AttachmentMode: claim.AttachmentMode,
AccessMode: claim.AccessMode,
ReadOnly: claim.Mode == structs.CSIVolumeClaimRead,
}
err := v.srv.RPC("ClientCSI.NodeDetachVolume",
req, &cstructs.ClientCSINodeDetachVolumeResponse{})
@@ -1295,11 +1296,12 @@ func (v *CSIVolume) nodeExpandVolume(vol *structs.CSIVolume, plugin *structs.CSI
resp := &cstructs.ClientCSINodeExpandVolumeResponse{}
req := &cstructs.ClientCSINodeExpandVolumeRequest{
PluginID: plugin.ID,
VolumeID: vol.ID,
ExternalID: vol.ExternalID,
Capacity: capacity,
Claim: claim,
PluginID: plugin.ID,
VolumeID: vol.ID,
VolumeNamespace: vol.Namespace,
ExternalID: vol.ExternalID,
Capacity: capacity,
Claim: claim,
}
if err := v.srv.RPC("ClientCSI.NodeExpandVolume", req, resp); err != nil {
mErr.Errors = append(mErr.Errors, err)

View File

@@ -33,6 +33,12 @@ func (c *CallCounter) Get() map[string]int {
return maps.Clone(c.counts)
}
func (c *CallCounter) Reset() {
c.lock.Lock()
defer c.lock.Unlock()
c.counts = make(map[string]int)
}
func (c *CallCounter) AssertCalled(t testing.T, name string) {
t.Helper()
counts := c.Get()