Files
nomad/client/pluginmanager/csimanager/volume_test.go
Tim Gross 9bc2190508 CSI: serialize node plugin RPCs per-volume (#26832)
In #26831 we're preventing unexpected node RPCs by ensuring that the volume
watcher only unpublishes when allocations are client-terminal.

To mitigate any remaining similar issues, add serialization of node plugin RPCs,
as we did for controller plugin RPCs in #17996 and as recommended ("SHOULD") by
the CSI specification. Here we can do per-volume serialization rather than
per-plugin serialization.

Reorder the methods of the `volumeManager` in the client so that each interface
method and its directly-associated helper methods read from top-to-bottom,
instead of a mix of directions.

Ref: https://github.com/hashicorp/nomad/pull/17996
Ref: https://github.com/hashicorp/nomad/pull/26831
2025-09-25 11:29:44 -04:00

691 lines
19 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package csimanager
import (
"context"
"errors"
"os"
"runtime"
"sync"
"testing"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/ci"
"github.com/hashicorp/nomad/helper/mount"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/csi"
csifake "github.com/hashicorp/nomad/plugins/csi/fake"
"github.com/shoenig/test/must"
"github.com/stretchr/testify/require"
)
func checkMountSupport() bool {
path, err := os.Getwd()
if err != nil {
return false
}
m := mount.New()
_, err = m.IsNotAMountPoint(path)
return err == nil
}
func TestVolumeManager_ensureStagingDir(t *testing.T) {
if !checkMountSupport() {
t.Skip("mount point detection not supported for this platform")
}
ci.Parallel(t)
cases := []struct {
Name string
Volume *structs.CSIVolume
UsageOptions *UsageOptions
CreateDirAheadOfTime bool
MountDirAheadOfTime bool
ExpectedErr error
ExpectedMountState bool
}{
{
Name: "Creates a directory when one does not exist",
Volume: &structs.CSIVolume{ID: "foo"},
UsageOptions: &UsageOptions{},
},
{
Name: "Does not fail because of a pre-existing directory",
Volume: &structs.CSIVolume{ID: "foo"},
UsageOptions: &UsageOptions{},
CreateDirAheadOfTime: true,
},
{
Name: "Returns negative mount info",
UsageOptions: &UsageOptions{},
Volume: &structs.CSIVolume{ID: "foo"},
},
{
Name: "Returns positive mount info",
Volume: &structs.CSIVolume{ID: "foo"},
UsageOptions: &UsageOptions{},
CreateDirAheadOfTime: true,
MountDirAheadOfTime: true,
ExpectedMountState: true,
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
// Step 1: Validate that the test case makes sense
if !tc.CreateDirAheadOfTime && tc.MountDirAheadOfTime {
require.Fail(t, "Cannot Mount without creating a dir")
}
if tc.MountDirAheadOfTime {
// We can enable these tests by either mounting a fake device on linux
// e.g shipping a small ext4 image file and using that as a loopback
// device, but there's no convenient way to implement this.
t.Skip("TODO: Skipped because we don't detect bind mounts")
}
// Step 2: Test Setup
tmpPath := t.TempDir()
csiFake := &csifake.Client{}
eventer := func(e *structs.NodeEvent) {}
manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake,
tmpPath, tmpPath, true, "i-example")
expectedStagingPath := manager.stagingDirForVolume(tmpPath,
tc.Volume.Namespace, tc.Volume.ID, tc.UsageOptions)
if tc.CreateDirAheadOfTime {
err := os.MkdirAll(expectedStagingPath, 0700)
require.NoError(t, err)
}
// Step 3: Now we can do some testing
path, detectedMount, testErr := manager.ensureStagingDir(tc.Volume, tc.UsageOptions)
if tc.ExpectedErr != nil {
require.EqualError(t, testErr, tc.ExpectedErr.Error())
return // We don't perform extra validation if an error was detected.
}
require.NoError(t, testErr)
require.Equal(t, tc.ExpectedMountState, detectedMount)
// If the ensureStagingDir call had to create a directory itself, then here
// we validate that the directory exists and its permissions
if !tc.CreateDirAheadOfTime {
file, err := os.Lstat(path)
require.NoError(t, err)
require.True(t, file.IsDir())
// TODO: Figure out a windows equivalent of this test
if runtime.GOOS != "windows" {
require.Equal(t, os.FileMode(0700), file.Mode().Perm())
}
}
})
}
}
func TestVolumeManager_stageVolume(t *testing.T) {
if !checkMountSupport() {
t.Skip("mount point detection not supported for this platform")
}
ci.Parallel(t)
cases := []struct {
Name string
Volume *structs.CSIVolume
UsageOptions *UsageOptions
PluginErr error
ExpectedErr error
}{
{
Name: "Returns an error when an invalid AttachmentMode is provided",
Volume: &structs.CSIVolume{
ID: "foo",
},
UsageOptions: &UsageOptions{AttachmentMode: "nonsense"},
ExpectedErr: errors.New("unknown volume attachment mode: nonsense"),
},
{
Name: "Returns an error when an invalid AccessMode is provided",
Volume: &structs.CSIVolume{
ID: "foo",
},
UsageOptions: &UsageOptions{
AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice,
AccessMode: "nonsense",
},
ExpectedErr: errors.New("unknown volume access mode: nonsense"),
},
{
Name: "Returns an error when the plugin returns an error",
Volume: &structs.CSIVolume{
ID: "foo",
},
UsageOptions: &UsageOptions{
AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
},
PluginErr: errors.New("Some Unknown Error"),
ExpectedErr: errors.New("Some Unknown Error"),
},
{
Name: "Happy Path",
Volume: &structs.CSIVolume{
ID: "foo",
},
UsageOptions: &UsageOptions{
AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
},
PluginErr: nil,
ExpectedErr: nil,
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
tmpPath := t.TempDir()
csiFake := &csifake.Client{}
csiFake.NextNodeStageVolumeErr = tc.PluginErr
eventer := func(e *structs.NodeEvent) {}
manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake,
tmpPath, tmpPath, true, "i-example")
ctx := context.Background()
err := manager.stageVolume(ctx, tc.Volume, tc.UsageOptions, nil)
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
} else {
require.NoError(t, err)
}
})
}
}
func TestVolumeManager_unstageVolume(t *testing.T) {
if !checkMountSupport() {
t.Skip("mount point detection not supported for this platform")
}
ci.Parallel(t)
cases := []struct {
Name string
Volume *structs.CSIVolume
UsageOptions *UsageOptions
PluginErr error
ExpectedErr error
ExpectedCSICallCount int64
}{
{
Name: "Returns an error when the plugin returns an error",
Volume: &structs.CSIVolume{
ID: "foo",
},
UsageOptions: &UsageOptions{},
PluginErr: errors.New("Some Unknown Error"),
ExpectedErr: errors.New("Some Unknown Error"),
ExpectedCSICallCount: 1,
},
{
Name: "Happy Path",
Volume: &structs.CSIVolume{
ID: "foo",
},
UsageOptions: &UsageOptions{},
PluginErr: nil,
ExpectedErr: nil,
ExpectedCSICallCount: 1,
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
tmpPath := t.TempDir()
csiFake := &csifake.Client{}
csiFake.NextNodeUnstageVolumeErr = tc.PluginErr
eventer := func(e *structs.NodeEvent) {}
manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake,
tmpPath, tmpPath, true, "i-example")
ctx := context.Background()
err := manager.unstageVolume(ctx,
tc.Volume.Namespace,
tc.Volume.ID, tc.Volume.RemoteID(), tc.UsageOptions)
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
} else {
require.NoError(t, err)
}
require.Equal(t, tc.ExpectedCSICallCount, csiFake.NodeUnstageVolumeCallCount)
})
}
}
func TestVolumeManager_publishVolume(t *testing.T) {
if !checkMountSupport() {
t.Skip("mount point detection not supported for this platform")
}
ci.Parallel(t)
cases := []struct {
Name string
Allocation *structs.Allocation
Volume *structs.CSIVolume
UsageOptions *UsageOptions
PluginErr error
ExpectedErr error
ExpectedCSICallCount int64
ExpectedVolumeCapability *csi.VolumeCapability
}{
{
Name: "Returns an error when the plugin returns an error",
Allocation: structs.MockAlloc(),
Volume: &structs.CSIVolume{
ID: "foo",
},
UsageOptions: &UsageOptions{
AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
},
PluginErr: errors.New("Some Unknown Error"),
ExpectedErr: errors.New("Some Unknown Error"),
ExpectedCSICallCount: 1,
},
{
Name: "Happy Path",
Allocation: structs.MockAlloc(),
Volume: &structs.CSIVolume{
ID: "foo",
},
UsageOptions: &UsageOptions{
AttachmentMode: structs.CSIVolumeAttachmentModeBlockDevice,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
},
PluginErr: nil,
ExpectedErr: nil,
ExpectedCSICallCount: 1,
},
{
Name: "Mount options in the volume",
Allocation: structs.MockAlloc(),
Volume: &structs.CSIVolume{
ID: "foo",
MountOptions: &structs.CSIMountOptions{
MountFlags: []string{"ro"},
},
},
UsageOptions: &UsageOptions{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
},
PluginErr: nil,
ExpectedErr: nil,
ExpectedCSICallCount: 1,
ExpectedVolumeCapability: &csi.VolumeCapability{
AccessType: csi.VolumeAccessTypeMount,
AccessMode: csi.VolumeAccessModeMultiNodeMultiWriter,
MountVolume: &structs.CSIMountOptions{
MountFlags: []string{"ro"},
},
},
},
{
Name: "Mount options override in the request",
Allocation: structs.MockAlloc(),
Volume: &structs.CSIVolume{
ID: "foo",
MountOptions: &structs.CSIMountOptions{
MountFlags: []string{"ro"},
},
},
UsageOptions: &UsageOptions{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
MountOptions: &structs.CSIMountOptions{
MountFlags: []string{"rw"},
},
},
PluginErr: nil,
ExpectedErr: nil,
ExpectedCSICallCount: 1,
ExpectedVolumeCapability: &csi.VolumeCapability{
AccessType: csi.VolumeAccessTypeMount,
AccessMode: csi.VolumeAccessModeMultiNodeMultiWriter,
MountVolume: &structs.CSIMountOptions{
MountFlags: []string{"rw"},
},
},
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
tmpPath := t.TempDir()
csiFake := &csifake.Client{}
csiFake.NextNodePublishVolumeErr = tc.PluginErr
eventer := func(e *structs.NodeEvent) {}
manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake,
tmpPath, tmpPath, true, "i-example")
ctx := context.Background()
_, err := manager.publishVolume(ctx, tc.Volume, tc.Allocation, tc.UsageOptions, nil)
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
} else {
require.NoError(t, err)
}
require.Equal(t, tc.ExpectedCSICallCount, csiFake.NodePublishVolumeCallCount)
if tc.ExpectedVolumeCapability != nil {
require.Equal(t, tc.ExpectedVolumeCapability, csiFake.PrevVolumeCapability)
}
})
}
}
func TestVolumeManager_unpublishVolume(t *testing.T) {
if !checkMountSupport() {
t.Skip("mount point detection not supported for this platform")
}
ci.Parallel(t)
cases := []struct {
Name string
Allocation *structs.Allocation
Volume *structs.CSIVolume
UsageOptions *UsageOptions
PluginErr error
ExpectedErr error
ExpectedCSICallCount int64
}{
{
Name: "Returns an error when the plugin returns an error",
Allocation: structs.MockAlloc(),
Volume: &structs.CSIVolume{
ID: "foo",
},
UsageOptions: &UsageOptions{},
PluginErr: errors.New("Some Unknown Error"),
ExpectedErr: errors.New("Some Unknown Error"),
ExpectedCSICallCount: 1,
},
{
Name: "Happy Path",
Allocation: structs.MockAlloc(),
Volume: &structs.CSIVolume{
ID: "foo",
},
UsageOptions: &UsageOptions{},
PluginErr: nil,
ExpectedErr: nil,
ExpectedCSICallCount: 1,
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
tmpPath := t.TempDir()
csiFake := &csifake.Client{}
csiFake.NextNodeUnpublishVolumeErr = tc.PluginErr
eventer := func(e *structs.NodeEvent) {}
manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake,
tmpPath, tmpPath, true, "i-example")
ctx := context.Background()
err := manager.unpublishVolume(ctx,
tc.Volume.ID, tc.Volume.RemoteID(), tc.Allocation.ID, tc.UsageOptions)
if tc.ExpectedErr != nil {
require.EqualError(t, err, tc.ExpectedErr.Error())
} else {
require.NoError(t, err)
}
require.Equal(t, tc.ExpectedCSICallCount, csiFake.NodeUnpublishVolumeCallCount)
})
}
}
func TestVolumeManager_MountVolumeEvents(t *testing.T) {
if !checkMountSupport() {
t.Skip("mount point detection not supported for this platform")
}
ci.Parallel(t)
tmpPath := t.TempDir()
csiFake := &csifake.Client{}
var events []*structs.NodeEvent
eventer := func(e *structs.NodeEvent) {
events = append(events, e)
}
manager := newVolumeManager(testlog.HCLogger(t), eventer, csiFake,
tmpPath, tmpPath, true, "i-example")
ctx := context.Background()
vol := &structs.CSIVolume{
ID: "vol",
Namespace: "ns",
}
alloc := mock.Alloc()
usage := &UsageOptions{
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
}
pubCtx := map[string]string{}
_, err := manager.MountVolume(ctx, vol, alloc, usage, pubCtx)
require.Error(t, err, "unknown volume attachment mode: ")
require.Equal(t, 1, len(events))
e := events[0]
require.Equal(t, "Mount volume", e.Message)
require.Equal(t, "Storage", e.Subsystem)
require.Equal(t, "vol", e.Details["volume_id"])
require.Equal(t, "false", e.Details["success"])
require.Equal(t, "unknown volume attachment mode: ", e.Details["error"])
events = events[1:]
usage.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
_, err = manager.MountVolume(ctx, vol, alloc, usage, pubCtx)
require.NoError(t, err)
require.Equal(t, 1, len(events))
e = events[0]
require.Equal(t, "Mount volume", e.Message)
require.Equal(t, "Storage", e.Subsystem)
require.Equal(t, "vol", e.Details["volume_id"])
require.Equal(t, "true", e.Details["success"])
events = events[1:]
err = manager.UnmountVolume(ctx, vol.Namespace, vol.ID, vol.RemoteID(), alloc.ID, usage)
require.NoError(t, err)
require.Equal(t, 1, len(events))
e = events[0]
require.Equal(t, "Unmount volume", e.Message)
require.Equal(t, "Storage", e.Subsystem)
require.Equal(t, "vol", e.Details["volume_id"])
require.Equal(t, "true", e.Details["success"])
}
// TestVolumeManager_InterleavedStaging tests that a volume cannot be unstaged
// if another alloc has staged but not yet published
func TestVolumeManager_InterleavedStaging(t *testing.T) {
ci.Parallel(t)
tmpPath := t.TempDir()
csiFake := &csifake.Client{}
logger := testlog.HCLogger(t)
ctx := hclog.WithContext(context.Background(), logger)
manager := newVolumeManager(logger,
func(e *structs.NodeEvent) {}, csiFake,
tmpPath, tmpPath, true, "i-example")
alloc0, alloc1 := mock.Alloc(), mock.Alloc()
vol := &structs.CSIVolume{ID: "vol", Namespace: "ns"}
usage := &UsageOptions{
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
}
pubCtx := map[string]string{}
// first alloc has previously claimed the volume
manager.usageTracker.Claim(alloc0.ID, vol.ID, vol.Namespace, usage)
alloc0WaitCh := make(chan struct{})
alloc1WaitCh := make(chan struct{})
// this goroutine simulates MountVolume, but with control over interleaving
// by waiting for the other alloc to check if should unstage before trying
// to publish
manager.usageTracker.Claim(alloc1.ID, vol.ID, vol.Namespace, usage)
must.NoError(t, manager.stageVolume(ctx, vol, usage, pubCtx))
go func() {
defer close(alloc1WaitCh)
<-alloc0WaitCh
_, err := manager.publishVolume(ctx, vol, alloc1, usage, pubCtx)
must.NoError(t, err)
}()
must.NoError(t, manager.UnmountVolume(ctx, vol.Namespace, vol.ID, "foo", alloc0.ID, usage))
close(alloc0WaitCh)
testTimeoutCtx, cancel := context.WithTimeout(context.TODO(), time.Second)
t.Cleanup(cancel)
select {
case <-alloc1WaitCh:
case <-testTimeoutCtx.Done():
t.Fatal("test timed out")
}
key := volumeUsageKey{
id: vol.ID,
ns: vol.Namespace,
usageOpts: *usage,
}
manager.usageTracker.stateMu.Lock()
t.Cleanup(manager.usageTracker.stateMu.Unlock)
must.Eq(t, []string{alloc1.ID}, manager.usageTracker.state[key])
must.Eq(t, 1, csiFake.NodeUnpublishVolumeCallCount, must.Sprint("expected 1 unpublish call"))
must.Eq(t, 0, csiFake.NodeUnstageVolumeCallCount, must.Sprint("expected no unstage call"))
}
func TestVolumeManager_Serialization(t *testing.T) {
ci.Parallel(t)
tmpPath := t.TempDir()
csiFake := &csifake.Client{}
logger := testlog.HCLogger(t)
ctx := hclog.WithContext(t.Context(), logger)
manager := newVolumeManager(logger,
func(e *structs.NodeEvent) {}, csiFake,
tmpPath, tmpPath, true, "i-example")
ctx, cancel := context.WithTimeout(t.Context(), time.Second)
t.Cleanup(cancel)
// test that an operation on a volume can block another operation on the
// same volume
//
// we can't guarantee the goroutines will try to contend, so we'll force the
// op in the goroutine to wait until we've entered the serialized function,
// and then have the serialized function sleep. the wait + the op in the
// goroutine should take at least as long as that sleep to complete and
// return
var wg sync.WaitGroup
wg.Add(1)
elapsedCh := make(chan time.Duration)
go func() {
now := time.Now()
wg.Wait()
manager.serializedOp(ctx, "ns", "vol0", func() error {
return errors.New("two")
})
elapsedCh <- time.Since(now)
}()
manager.serializedOp(ctx, "ns", "vol0", func() error {
wg.Done()
time.Sleep(100 * time.Millisecond)
return errors.New("one")
})
must.GreaterEq(t, 100*time.Millisecond, <-elapsedCh)
// test that serialized ops for different volumes don't block each other
var wg1 sync.WaitGroup
var wg2 sync.WaitGroup
wg1.Add(1)
wg2.Add(1)
errs := make(chan error, 2)
go func() {
errs <- manager.serializedOp(ctx, "ns", "vol0", func() error {
// at this point we've entered the serialized op for vol0 and are
// waiting to enter the serialized op for vol1. if serialization
// blocks vol1's op, we'll never unblock here and will hit the
// timeout
wg1.Wait()
wg2.Done()
return errors.New("four")
})
}()
errs <- manager.serializedOp(ctx, "ns", "vol1", func() error {
wg1.Done() // unblock the first op
wg2.Wait() // wait for the first op to make sure we're running concurrently
return errors.New("five")
})
ctx2, cancel2 := context.WithTimeout(t.Context(), time.Second)
t.Cleanup(cancel2)
found := 0
for {
if found >= 2 {
break
}
select {
case <-errs:
found++
case <-ctx2.Done():
t.Fatal("timed out waiting for error")
}
}
}