mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
csi: rename volume Mounter to Manager (#18434)
to align with its broader purpose, and the volumeManager implementation
This commit is contained in:
@@ -260,12 +260,12 @@ func (c *csiHook) restoreMounts(results map[string]*volumePublishResult) error {
|
||||
}
|
||||
c.logger.Debug("found CSI plugin", "type", pType, "name", plugin)
|
||||
|
||||
mounter, err := c.csimanager.MounterForPlugin(c.shutdownCtx, plugin)
|
||||
manager, err := c.csimanager.ManagerForPlugin(c.shutdownCtx, plugin)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
isMounted, err := mounter.HasMount(c.shutdownCtx, result.stub.MountInfo)
|
||||
isMounted, err := manager.HasMount(c.shutdownCtx, result.stub.MountInfo)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -350,7 +350,7 @@ func (c *csiHook) mountVolumes(results map[string]*volumePublishResult) error {
|
||||
}
|
||||
c.logger.Debug("found CSI plugin", "type", pType, "name", plugin)
|
||||
|
||||
mounter, err := c.csimanager.MounterForPlugin(c.shutdownCtx, plugin)
|
||||
manager, err := c.csimanager.ManagerForPlugin(c.shutdownCtx, plugin)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -362,7 +362,7 @@ func (c *csiHook) mountVolumes(results map[string]*volumePublishResult) error {
|
||||
MountOptions: result.request.MountOptions,
|
||||
}
|
||||
|
||||
mountInfo, err := mounter.MountVolume(
|
||||
mountInfo, err := manager.MountVolume(
|
||||
c.shutdownCtx, result.volume, c.alloc, usageOpts, result.publishContext)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -519,7 +519,7 @@ func (c *csiHook) unmountWithRetry(result *volumePublishResult) error {
|
||||
// NodeEvent
|
||||
func (c *csiHook) unmountImpl(result *volumePublishResult) error {
|
||||
|
||||
mounter, err := c.csimanager.MounterForPlugin(c.shutdownCtx, result.stub.PluginID)
|
||||
manager, err := c.csimanager.ManagerForPlugin(c.shutdownCtx, result.stub.PluginID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -531,7 +531,7 @@ func (c *csiHook) unmountImpl(result *volumePublishResult) error {
|
||||
MountOptions: result.request.MountOptions,
|
||||
}
|
||||
|
||||
return mounter.UnmountVolume(c.shutdownCtx,
|
||||
return manager.UnmountVolume(c.shutdownCtx,
|
||||
result.stub.VolumeID, result.stub.VolumeExternalID, c.alloc.ID, usageOpts)
|
||||
}
|
||||
|
||||
|
||||
@@ -227,7 +227,7 @@ func TestCSIHook(t *testing.T) {
|
||||
alloc.Job.TaskGroups[0].Volumes = tc.volumeRequests
|
||||
|
||||
callCounts := &callCounter{counts: map[string]int{}}
|
||||
mgr := mockPluginManager{mounter: mockVolumeMounter{
|
||||
mgr := mockPluginManager{mounter: mockVolumeManager{
|
||||
hasMounts: tc.startsWithValidMounts,
|
||||
callCounts: callCounts,
|
||||
failsFirstUnmount: pointer.Of(tc.failsFirstUnmount),
|
||||
@@ -342,7 +342,7 @@ func TestCSIHook_Prerun_Validation(t *testing.T) {
|
||||
alloc.Job.TaskGroups[0].Volumes = volumeRequests
|
||||
|
||||
callCounts := &callCounter{counts: map[string]int{}}
|
||||
mgr := mockPluginManager{mounter: mockVolumeMounter{
|
||||
mgr := mockPluginManager{mounter: mockVolumeManager{
|
||||
callCounts: callCounts,
|
||||
failsFirstUnmount: pointer.Of(false),
|
||||
}}
|
||||
@@ -469,20 +469,20 @@ func (r mockRPCer) testVolume(id string) *structs.CSIVolume {
|
||||
return vol
|
||||
}
|
||||
|
||||
type mockVolumeMounter struct {
|
||||
type mockVolumeManager struct {
|
||||
hasMounts bool
|
||||
failsFirstUnmount *bool
|
||||
callCounts *callCounter
|
||||
}
|
||||
|
||||
func (vm mockVolumeMounter) MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *csimanager.UsageOptions, publishContext map[string]string) (*csimanager.MountInfo, error) {
|
||||
func (vm mockVolumeManager) MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *csimanager.UsageOptions, publishContext map[string]string) (*csimanager.MountInfo, error) {
|
||||
vm.callCounts.inc("mount")
|
||||
return &csimanager.MountInfo{
|
||||
Source: filepath.Join("test-alloc-dir", alloc.ID, vol.ID, usageOpts.ToFS()),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (vm mockVolumeMounter) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usageOpts *csimanager.UsageOptions) error {
|
||||
func (vm mockVolumeManager) UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usageOpts *csimanager.UsageOptions) error {
|
||||
vm.callCounts.inc("unmount")
|
||||
|
||||
if *vm.failsFirstUnmount {
|
||||
@@ -493,24 +493,24 @@ func (vm mockVolumeMounter) UnmountVolume(ctx context.Context, volID, remoteID,
|
||||
return nil
|
||||
}
|
||||
|
||||
func (vm mockVolumeMounter) HasMount(_ context.Context, mountInfo *csimanager.MountInfo) (bool, error) {
|
||||
func (vm mockVolumeManager) HasMount(_ context.Context, mountInfo *csimanager.MountInfo) (bool, error) {
|
||||
vm.callCounts.inc("hasMount")
|
||||
return mountInfo != nil && vm.hasMounts, nil
|
||||
}
|
||||
|
||||
func (vm mockVolumeMounter) ExternalID() string {
|
||||
func (vm mockVolumeManager) ExternalID() string {
|
||||
return "i-example"
|
||||
}
|
||||
|
||||
type mockPluginManager struct {
|
||||
mounter mockVolumeMounter
|
||||
mounter mockVolumeManager
|
||||
}
|
||||
|
||||
func (mgr mockPluginManager) WaitForPlugin(ctx context.Context, pluginType, pluginID string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (mgr mockPluginManager) MounterForPlugin(ctx context.Context, pluginID string) (csimanager.VolumeMounter, error) {
|
||||
func (mgr mockPluginManager) ManagerForPlugin(ctx context.Context, pluginID string) (csimanager.VolumeManager, error) {
|
||||
return mgr.mounter, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -474,7 +474,7 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re
|
||||
ctx, cancelFn := c.requestContext()
|
||||
defer cancelFn()
|
||||
|
||||
mounter, err := c.c.csimanager.MounterForPlugin(ctx, req.PluginID)
|
||||
manager, err := c.c.csimanager.ManagerForPlugin(ctx, req.PluginID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("CSI.NodeDetachVolume: %v", err)
|
||||
}
|
||||
@@ -485,7 +485,7 @@ func (c *CSI) NodeDetachVolume(req *structs.ClientCSINodeDetachVolumeRequest, re
|
||||
AccessMode: req.AccessMode,
|
||||
}
|
||||
|
||||
err = mounter.UnmountVolume(ctx, req.VolumeID, req.ExternalID, req.AllocID, usageOpts)
|
||||
err = manager.UnmountVolume(ctx, req.VolumeID, req.ExternalID, req.AllocID, usageOpts)
|
||||
if err != nil && !errors.Is(err, nstructs.ErrCSIClientRPCIgnorable) {
|
||||
// if the unmounting previously happened but the server failed to
|
||||
// checkpoint, we'll get an error from Unmount but can safely
|
||||
|
||||
@@ -107,10 +107,10 @@ func (i *instanceManager) setupVolumeManager() {
|
||||
}
|
||||
}
|
||||
|
||||
// VolumeMounter returns the volume manager that is configured for the given plugin
|
||||
// VolumeManager returns the volume manager that is configured for the given plugin
|
||||
// instance. If called before the volume manager has been setup, it will block until
|
||||
// the volume manager is ready or the context is closed.
|
||||
func (i *instanceManager) VolumeMounter(ctx context.Context) (VolumeMounter, error) {
|
||||
func (i *instanceManager) VolumeManager(ctx context.Context) (VolumeManager, error) {
|
||||
select {
|
||||
case <-i.volumeManagerSetupCh:
|
||||
return i.volumeManager, nil
|
||||
|
||||
@@ -53,7 +53,7 @@ func (u *UsageOptions) ToFS() string {
|
||||
return sb.String()
|
||||
}
|
||||
|
||||
type VolumeMounter interface {
|
||||
type VolumeManager interface {
|
||||
MountVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usageOpts *UsageOptions, publishContext map[string]string) (*MountInfo, error)
|
||||
UnmountVolume(ctx context.Context, volID, remoteID, allocID string, usageOpts *UsageOptions) error
|
||||
HasMount(ctx context.Context, mountInfo *MountInfo) (bool, error)
|
||||
@@ -68,9 +68,9 @@ type Manager interface {
|
||||
// or until its context is canceled or times out.
|
||||
WaitForPlugin(ctx context.Context, pluginType, pluginID string) error
|
||||
|
||||
// MounterForPlugin returns a VolumeMounter for the plugin ID associated
|
||||
// ManagerForPlugin returns a VolumeManager for the plugin ID associated
|
||||
// with the volume. Returns an error if this plugin isn't registered.
|
||||
MounterForPlugin(ctx context.Context, pluginID string) (VolumeMounter, error)
|
||||
ManagerForPlugin(ctx context.Context, pluginID string) (VolumeManager, error)
|
||||
|
||||
// Shutdown shuts down the Manager and unmounts any locally attached volumes.
|
||||
Shutdown()
|
||||
|
||||
@@ -93,7 +93,7 @@ func (c *csiManager) WaitForPlugin(ctx context.Context, pType, pID string) error
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *csiManager) MounterForPlugin(ctx context.Context, pluginID string) (VolumeMounter, error) {
|
||||
func (c *csiManager) ManagerForPlugin(ctx context.Context, pluginID string) (VolumeManager, error) {
|
||||
c.instancesLock.RLock()
|
||||
defer c.instancesLock.RUnlock()
|
||||
nodePlugins, hasAnyNodePlugins := c.instances["csi-node"]
|
||||
@@ -106,7 +106,7 @@ func (c *csiManager) MounterForPlugin(ctx context.Context, pluginID string) (Vol
|
||||
return nil, fmt.Errorf("plugin %s for type csi-node not found", pluginID)
|
||||
}
|
||||
|
||||
return mgr.VolumeMounter(ctx)
|
||||
return mgr.VolumeManager(ctx)
|
||||
}
|
||||
|
||||
// Run starts a plugin manager and should return early
|
||||
|
||||
@@ -20,7 +20,7 @@ import (
|
||||
"github.com/hashicorp/nomad/plugins/csi"
|
||||
)
|
||||
|
||||
var _ VolumeMounter = &volumeManager{}
|
||||
var _ VolumeManager = &volumeManager{}
|
||||
|
||||
const (
|
||||
DefaultMountActionTimeout = 2 * time.Minute
|
||||
|
||||
Reference in New Issue
Block a user