Files
nomad/client/pluginmanager/csimanager/volume.go
Tim Gross 9bc2190508 CSI: serialize node plugin RPCs per-volume (#26832)
In #26831 we're preventing unexpected node RPCs by ensuring that the volume
watcher only unpublishes when allocations are client-terminal.

To mitigate any remaining similar issues, add serialization of node plugin RPCs,
as we did for controller plugin RPCs in #17996 and as recommended ("SHOULD") by
the CSI specification. Here we can do per-volume serialization rather than
per-plugin serialization.

Reorder the methods of the `volumeManager` in the client so that each interface
method and its directly-associated helper methods read from top-to-bottom,
instead of a mix of directions.

Ref: https://github.com/hashicorp/nomad/pull/17996
Ref: https://github.com/hashicorp/nomad/pull/26831
2025-09-25 11:29:44 -04:00

554 lines
19 KiB
Go

// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package csimanager
import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"strings"
"sync"
"time"
grpc_retry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper/mount"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/csi"
)
var _ VolumeManager = &volumeManager{}
const (
DefaultMountActionTimeout = 2 * time.Minute
StagingDirName = "staging"
AllocSpecificDirName = "per-alloc"
)
// volumeManager handles the state of attached volumes for a given CSI Plugin.
//
// volumeManagers outlive the lifetime of a given allocation as volumes may be
// shared by multiple allocations on the same node.
//
// volumes are stored by an enriched volume usage struct as the CSI Spec requires
// slightly different usage based on the given usage model.
type volumeManager struct {
logger hclog.Logger
eventer TriggerNodeEvent
plugin csi.CSIPlugin
usageTracker *volumeUsageTracker
// mountRoot is the root of where plugin directories and mounts may be created
// e.g /opt/nomad.d/statedir/csi/my-csi-plugin/
mountRoot string
// containerMountPoint is the location _inside_ the plugin container that the
// `mountRoot` is bound in to.
containerMountPoint string
// requiresStaging shows whether the plugin requires that the volume manager
// calls NodeStageVolume and NodeUnstageVolume RPCs during setup and teardown
requiresStaging bool
// externalNodeID is the identity of a given nomad client as observed by the
// storage provider (ex. a hostname, VM instance ID, etc.)
externalNodeID string
inFlight map[structs.NamespacedID]context.Context
inFlightLock sync.Mutex
}
func newVolumeManager(logger hclog.Logger, eventer TriggerNodeEvent, plugin csi.CSIPlugin, rootDir, containerRootDir string, requiresStaging bool, externalID string) *volumeManager {
return &volumeManager{
logger: logger.Named("volume_manager"),
eventer: eventer,
plugin: plugin,
mountRoot: rootDir,
containerMountPoint: containerRootDir,
requiresStaging: requiresStaging,
usageTracker: newVolumeUsageTracker(),
externalNodeID: externalID,
inFlight: make(map[structs.NamespacedID]context.Context),
}
}
func (v *volumeManager) ExternalID() string {
return v.externalNodeID
}
func (v *volumeManager) HasMount(_ context.Context, mountInfo *MountInfo) (bool, error) {
m := mount.New()
isNotMount, err := m.IsNotAMountPoint(mountInfo.Source)
return !isNotMount, err
}
// MountVolume performs the steps required for using a given volume
// configuration for the provided allocation. It is passed the publishContext
// from remote attachment, and specific usage modes from the CSI Hook. It then
// uses this state to stage and publish the volume as required for use by the
// given allocation.
func (v *volumeManager) MountVolume(ctx context.Context,
vol *structs.CSIVolume, alloc *structs.Allocation,
usage *UsageOptions, publishContext map[string]string,
) (*MountInfo, error) {
var mountInfo *MountInfo
err := v.serializedOp(ctx, vol.Namespace, vol.ID, func() error {
var err error
mountInfo, err = v.mountVolumeImpl(ctx, vol, alloc, usage, publishContext)
return err
})
return mountInfo, err
}
func (v *volumeManager) mountVolumeImpl(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions, publishContext map[string]string) (mountInfo *MountInfo, err error) {
logger := v.logger.With("volume_id", vol.ID, "alloc_id", alloc.ID)
ctx = hclog.WithContext(ctx, logger)
// Claim before we stage/publish to prevent interleaved Unmount for another
// alloc from unstaging between stage/publish steps below
v.usageTracker.Claim(alloc.ID, vol.ID, vol.Namespace, usage)
if v.requiresStaging {
err = v.stageVolume(ctx, vol, usage, publishContext)
}
if err == nil {
mountInfo, err = v.publishVolume(ctx, vol, alloc, usage, publishContext)
}
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemStorage).
SetMessage("Mount volume").
AddDetail("volume_id", vol.ID)
if err == nil {
event.AddDetail("success", "true")
} else {
event.AddDetail("success", "false")
event.AddDetail("error", err.Error())
v.usageTracker.Free(alloc.ID, vol.ID, vol.Namespace, usage)
}
v.eventer(event)
return mountInfo, err
}
// stageVolume prepares a volume for use by allocations. When a plugin exposes
// the STAGE_UNSTAGE_VOLUME capability it MUST be called once-per-volume for a
// given usage mode before the volume can be NodePublish-ed.
func (v *volumeManager) stageVolume(ctx context.Context, vol *structs.CSIVolume, usage *UsageOptions, publishContext map[string]string) error {
logger := hclog.FromContext(ctx)
logger.Trace("Preparing volume staging environment")
hostStagingPath, isMount, err := v.ensureStagingDir(vol, usage)
if err != nil {
return err
}
pluginStagingPath := v.stagingDirForVolume(v.containerMountPoint, vol.Namespace, vol.ID, usage)
logger.Trace("Volume staging environment", "pre-existing_mount", isMount, "host_staging_path", hostStagingPath, "plugin_staging_path", pluginStagingPath)
if isMount {
logger.Debug("re-using existing staging mount for volume", "staging_path", hostStagingPath)
return nil
}
capability, err := volumeCapability(vol, usage)
if err != nil {
return err
}
req := &csi.NodeStageVolumeRequest{
ExternalID: vol.RemoteID(),
PublishContext: publishContext,
StagingTargetPath: pluginStagingPath,
VolumeCapability: capability,
Secrets: vol.Secrets,
VolumeContext: vol.Context,
}
// CSI NodeStageVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
return v.plugin.NodeStageVolume(ctx, req,
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
)
}
func (v *volumeManager) publishVolume(ctx context.Context, vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions, publishContext map[string]string) (*MountInfo, error) {
logger := hclog.FromContext(ctx)
var pluginStagingPath string
if v.requiresStaging {
pluginStagingPath = v.stagingDirForVolume(v.containerMountPoint, vol.Namespace, vol.ID, usage)
}
hostTargetPath, isMount, err := v.ensureAllocDir(vol, alloc, usage)
if err != nil {
return nil, err
}
pluginTargetPath := v.targetForVolume(v.containerMountPoint, vol.ID, alloc.ID, usage)
if isMount {
logger.Debug("Re-using existing published volume for allocation")
return &MountInfo{Source: hostTargetPath}, nil
}
capabilities, err := volumeCapability(vol, usage)
if err != nil {
return nil, err
}
// CSI NodePublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
err = v.plugin.NodePublishVolume(ctx, &csi.NodePublishVolumeRequest{
ExternalID: vol.RemoteID(),
PublishContext: publishContext,
StagingTargetPath: pluginStagingPath,
TargetPath: pluginTargetPath,
VolumeCapability: capabilities,
Readonly: usage.ReadOnly,
Secrets: vol.Secrets,
VolumeContext: vol.Context,
},
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
)
return &MountInfo{Source: hostTargetPath}, err
}
// UnmountVolume unpublishes the volume for a specific allocation, and unstages
// the volume if there are no more allocations claiming it on the node
func (v *volumeManager) UnmountVolume(ctx context.Context,
volNS, volID, remoteID, allocID string, usage *UsageOptions,
) error {
return v.serializedOp(ctx, volNS, volID, func() error {
return v.unmountVolumeImpl(ctx, volNS, volID, remoteID, allocID, usage)
})
}
func (v *volumeManager) unmountVolumeImpl(ctx context.Context, volNS, volID, remoteID, allocID string, usage *UsageOptions) error {
logger := v.logger.With("volume_id", volID, "ns", volNS, "alloc_id", allocID)
ctx = hclog.WithContext(ctx, logger)
logger.Trace("unmounting volume")
err := v.unpublishVolume(ctx, volID, remoteID, allocID, usage)
if err == nil || errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
canRelease := v.usageTracker.Free(allocID, volID, volNS, usage)
if v.requiresStaging && canRelease {
err = v.unstageVolume(ctx, volNS, volID, remoteID, usage)
}
}
if errors.Is(err, structs.ErrCSIClientRPCIgnorable) {
logger.Trace("unmounting volume failed with ignorable error", "error", err)
err = nil
}
event := structs.NewNodeEvent().
SetSubsystem(structs.NodeEventSubsystemStorage).
SetMessage("Unmount volume").
AddDetail("volume_id", volID)
if err == nil {
event.AddDetail("success", "true")
} else {
event.AddDetail("success", "false")
event.AddDetail("error", err.Error())
}
v.eventer(event)
return err
}
func (v *volumeManager) unpublishVolume(ctx context.Context, volID, remoteID, allocID string, usage *UsageOptions) error {
pluginTargetPath := v.targetForVolume(v.containerMountPoint, volID, allocID, usage)
logger := hclog.FromContext(ctx)
logger.Trace("unpublishing volume", "plugin_target_path", pluginTargetPath)
// CSI NodeUnpublishVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
rpcErr := v.plugin.NodeUnpublishVolume(ctx, remoteID, pluginTargetPath,
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
)
hostTargetPath := v.targetForVolume(v.mountRoot, volID, allocID, usage)
if _, err := os.Stat(hostTargetPath); os.IsNotExist(err) {
if rpcErr != nil && strings.Contains(rpcErr.Error(), "no mount point") {
// host target path was already destroyed, nothing to do here.
// this helps us in the case that a previous GC attempt cleaned
// up the volume on the node but the controller RPCs failed
rpcErr = fmt.Errorf("%w: %v", structs.ErrCSIClientRPCIgnorable, rpcErr)
}
return rpcErr
}
logger.Trace("removing host path", "host_target_path", hostTargetPath)
// Host Target Path was not cleaned up, attempt to do so here. If it's still
// a mount then removing the dir will fail and we'll return any rpcErr and the
// file error.
rmErr := os.Remove(hostTargetPath)
if rmErr != nil {
return combineErrors(rpcErr, rmErr)
}
// We successfully removed the directory, return any rpcErrors that were
// encountered, but because we got here, they were probably flaky or was
// cleaned up externally.
return fmt.Errorf("%w: %v", structs.ErrCSIClientRPCIgnorable, rpcErr)
}
// unstageVolume is the inverse operation of `stageVolume` and must be called
// once for each staging path that a volume has been staged under.
// It is safe to call multiple times and a plugin is required to return OK if
// the volume has been unstaged or was never staged on the node.
func (v *volumeManager) unstageVolume(ctx context.Context, volNS, volID, remoteID string, usage *UsageOptions) error {
logger := hclog.FromContext(ctx)
// This is the staging path inside the container, which we pass to the
// plugin to perform unstaging
stagingPath := v.stagingDirForVolume(v.containerMountPoint, volNS, volID, usage)
// This is the path from the host, which we need to use to verify whether
// the path is the right one to pass to the plugin container
hostStagingPath := v.stagingDirForVolume(v.mountRoot, volNS, volID, usage)
_, err := os.Stat(hostStagingPath)
if err != nil && errors.Is(err, fs.ErrNotExist) {
// COMPAT: it's possible to get an unmount request that includes the
// namespace even for volumes that were mounted before the path included
// the namespace, so if the staging path doesn't exist, try the older
// path
stagingPath = v.stagingDirForVolume(v.containerMountPoint, "", volID, usage)
}
logger.Trace("unstaging volume", "staging_path", stagingPath)
// CSI NodeUnstageVolume errors for timeout, codes.Unavailable and
// codes.ResourceExhausted are retried; all other errors are fatal.
return v.plugin.NodeUnstageVolume(ctx,
remoteID,
stagingPath,
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
)
}
// ExpandVolume sends a NodeExpandVolume request to the node plugin
func (v *volumeManager) ExpandVolume(ctx context.Context,
volNS, volID, remoteID, allocID string,
usage *UsageOptions, capacity *csi.CapacityRange,
) (int64, error) {
var newCapacity int64
err := v.serializedOp(ctx, volNS, volID, func() error {
var err error
newCapacity, err = v.expandVolumeImpl(
ctx, volNS, volID, remoteID, allocID, usage, capacity)
return err
})
return newCapacity, err
}
func (v *volumeManager) expandVolumeImpl(ctx context.Context, volNS, volID, remoteID, allocID string, usage *UsageOptions, capacity *csi.CapacityRange) (newCapacity int64, err error) {
capability, err := csi.VolumeCapabilityFromStructs(usage.AttachmentMode, usage.AccessMode, usage.MountOptions)
if err != nil {
// nil may be acceptable, so let the node plugin decide.
v.logger.Warn("ExpandVolume: unable to detect volume capability",
"volume_id", volID, "alloc_id", allocID, "error", err)
}
// This is the staging path inside the container, which we pass to the
// plugin to perform expansion
stagingPath := v.stagingDirForVolume(v.containerMountPoint, volNS, volID, usage)
// This is the path from the host, which we need to use to verify whether
// the path is the right one to pass to the plugin container
hostStagingPath := v.stagingDirForVolume(v.mountRoot, volNS, volID, usage)
_, err = os.Stat(hostStagingPath)
if err != nil && errors.Is(err, fs.ErrNotExist) {
// COMPAT: it's possible to get an unmount request that includes the
// namespace even for volumes that were mounted before the path included
// the namespace, so if the staging path doesn't exist, try the older
// path
stagingPath = v.stagingDirForVolume(v.containerMountPoint, "", volID, usage)
}
req := &csi.NodeExpandVolumeRequest{
ExternalVolumeID: remoteID,
CapacityRange: capacity,
Capability: capability,
TargetPath: v.targetForVolume(v.containerMountPoint, volID, allocID, usage),
StagingPath: stagingPath,
}
resp, err := v.plugin.NodeExpandVolume(ctx, req,
grpc_retry.WithPerRetryTimeout(DefaultMountActionTimeout),
grpc_retry.WithMax(3),
grpc_retry.WithBackoff(grpc_retry.BackoffExponential(100*time.Millisecond)),
)
if err != nil {
return 0, err
}
if resp == nil {
return 0, errors.New("nil response from plugin.NodeExpandVolume")
}
return resp.CapacityBytes, nil
}
func (v *volumeManager) stagingDirForVolume(root string, volNS, volID string, usage *UsageOptions) string {
return filepath.Join(root, StagingDirName, volNS, volID, usage.ToFS())
}
func (v *volumeManager) allocDirForVolume(root string, volID, allocID string) string {
return filepath.Join(root, AllocSpecificDirName, allocID, volID)
}
func (v *volumeManager) targetForVolume(root string, volID, allocID string, usage *UsageOptions) string {
return filepath.Join(root, AllocSpecificDirName, allocID, volID, usage.ToFS())
}
// ensureStagingDir attempts to create a directory for use when staging a volume
// and then validates that the path is not already a mount point for e.g an
// existing volume stage.
//
// Returns whether the directory is a pre-existing mountpoint, the staging path,
// and any errors that occurred.
func (v *volumeManager) ensureStagingDir(vol *structs.CSIVolume, usage *UsageOptions) (string, bool, error) {
hostStagingPath := v.stagingDirForVolume(v.mountRoot, vol.Namespace, vol.ID, usage)
// Make the staging path, owned by the Nomad User
if err := os.MkdirAll(hostStagingPath, 0700); err != nil && !os.IsExist(err) {
return "", false, fmt.Errorf("failed to create staging directory for volume (%s): %v", vol.ID, err)
}
// Validate that it is not already a mount point
m := mount.New()
isNotMount, err := m.IsNotAMountPoint(hostStagingPath)
if err != nil {
return "", false, fmt.Errorf("mount point detection failed for volume (%s): %v", vol.ID, err)
}
return hostStagingPath, !isNotMount, nil
}
// ensureAllocDir attempts to create a directory for use when publishing a volume
// and then validates that the path is not already a mount point (e.g when reattaching
// to existing allocs).
//
// Returns whether the directory is a pre-existing mountpoint, the publish path,
// and any errors that occurred.
func (v *volumeManager) ensureAllocDir(vol *structs.CSIVolume, alloc *structs.Allocation, usage *UsageOptions) (string, bool, error) {
allocPath := v.allocDirForVolume(v.mountRoot, vol.ID, alloc.ID)
// Make the alloc path, owned by the Nomad User
if err := os.MkdirAll(allocPath, 0700); err != nil && !os.IsExist(err) {
return "", false, fmt.Errorf("failed to create allocation directory for volume (%s): %v", vol.ID, err)
}
// Validate that the target is not already a mount point
targetPath := v.targetForVolume(v.mountRoot, vol.ID, alloc.ID, usage)
m := mount.New()
isNotMount, err := m.IsNotAMountPoint(targetPath)
switch {
case errors.Is(err, os.ErrNotExist):
// ignore; path does not exist and as such is not a mount
case err != nil:
return "", false, fmt.Errorf("mount point detection failed for volume (%s): %v", vol.ID, err)
}
return targetPath, !isNotMount, nil
}
func volumeCapability(vol *structs.CSIVolume, usage *UsageOptions) (*csi.VolumeCapability, error) {
var opts *structs.CSIMountOptions
if vol.MountOptions == nil {
opts = usage.MountOptions
} else {
opts = vol.MountOptions.Copy()
opts.Merge(usage.MountOptions)
}
capability, err := csi.VolumeCapabilityFromStructs(usage.AttachmentMode, usage.AccessMode, opts)
if err != nil {
return nil, err
}
return capability, nil
}
func combineErrors(maybeErrs ...error) error {
var result *multierror.Error
for _, err := range maybeErrs {
if err == nil {
continue
}
result = multierror.Append(result, err)
}
return result.ErrorOrNil()
}
// serializedOp ensures that we only have one in-flight request per volume, and
// keeps multi-step operations (ex. stage + publish) together in a single batch
// rather than potentially interleaving
func (v *volumeManager) serializedOp(ctx context.Context, volumeNS, volumeID string, fn func() error) error {
id := structs.NewNamespacedID(volumeID, volumeNS)
for {
v.inFlightLock.Lock()
future := v.inFlight[id]
if future == nil {
future, resolveFuture := context.WithCancel(ctx)
v.inFlight[id] = future
v.inFlightLock.Unlock()
err := fn()
// close the future while holding the lock and not in a defer so
// that we can ensure we've cleared it from the map before allowing
// anyone else to take the lock and write a new one
v.inFlightLock.Lock()
resolveFuture()
delete(v.inFlight, id)
v.inFlightLock.Unlock()
return err
} else {
v.inFlightLock.Unlock()
select {
case <-future.Done():
continue
case <-ctx.Done():
return nil // agent shutdown
}
}
}
}