dynamic host volumes: change env vars, fixup auto-delete (#24943)

* plugin env: DHV_HOST_PATH->DHV_VOLUMES_DIR
* client config: host_volumes_dir
* plugin env: add namespace+nodepool
* only auto-delete after error saving client state
  on *initial* create
This commit is contained in:
Daniel Bennett
2025-01-27 11:36:53 -05:00
committed by GitHub
parent 890daba432
commit 49c147bcd7
18 changed files with 244 additions and 161 deletions

View File

@@ -539,8 +539,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
// set up dynamic host volume manager
c.hostVolumeManager = hvm.NewHostVolumeManager(logger, hvm.Config{
PluginDir: cfg.HostVolumePluginDir,
SharedMountDir: cfg.AllocMountsDir,
PluginDir: c.GetConfig().HostVolumePluginDir,
VolumesDir: c.GetConfig().HostVolumesDir,
NodePool: c.Node().NodePool,
StateMgr: c.stateDB,
UpdateNodeVols: c.batchNodeUpdates.updateNodeFromHostVolume,
})
@@ -702,6 +703,13 @@ func (c *Client) init() error {
c.stateDB = db
// Ensure host_volumes_dir config is not empty.
if conf.HostVolumesDir == "" {
conf = c.UpdateConfig(func(c *config.Config) {
c.HostVolumesDir = filepath.Join(conf.StateDir, "host_volumes")
})
}
// Ensure the alloc mounts dir exists if we are configured with a custom path.
if conf.AllocMountsDir != "" {
if err := os.MkdirAll(conf.AllocMountsDir, 0o711); err != nil {

View File

@@ -314,6 +314,10 @@ type Config struct {
// HostVolumes is a map of the configured host volumes by name.
HostVolumes map[string]*structs.ClientHostVolumeConfig
// HostVolumesDir is the suggested directory for plugins to put volumes.
// Volume plugins may ignore this suggestion, but we provide this default.
HostVolumesDir string
// HostVolumePluginDir is the directory with dynamic host volume plugins.
HostVolumePluginDir string

View File

@@ -44,7 +44,7 @@ func (h *DynamicHostVolumePluginFingerprint) Fingerprint(request *FingerprintReq
return nil
}
plugins, err := GetHostVolumePluginVersions(h.logger, pluginDir)
plugins, err := GetHostVolumePluginVersions(h.logger, pluginDir, request.Node.NodePool)
if err != nil {
if os.IsNotExist(err) {
h.logger.Debug("plugin dir does not exist", "dir", pluginDir)
@@ -74,10 +74,10 @@ func (h *DynamicHostVolumePluginFingerprint) Periodic() (bool, time.Duration) {
return false, 0
}
// GetHostVolumePluginVersions finds all the executable files on disk
// that respond to a Version call (arg $1 = 'version' / env $OPERATION = 'version')
// The return map's keys are plugin IDs, and the values are version strings.
func GetHostVolumePluginVersions(log hclog.Logger, pluginDir string) (map[string]string, error) {
// GetHostVolumePluginVersions finds all the executable files on disk that
// respond to a `fingerprint` call. The return map's keys are plugin IDs,
// and the values are version strings.
func GetHostVolumePluginVersions(log hclog.Logger, pluginDir, nodePool string) (map[string]string, error) {
files, err := helper.FindExecutableFiles(pluginDir)
if err != nil {
return nil, err
@@ -97,7 +97,7 @@ func GetHostVolumePluginVersions(log hclog.Logger, pluginDir string) (map[string
log := log.With("plugin_id", file)
p, err := hvm.NewHostVolumePluginExternal(log, pluginDir, file, "")
p, err := hvm.NewHostVolumePluginExternal(log, pluginDir, file, "", nodePool)
if err != nil {
log.Warn("error getting plugin", "error", err)
return

View File

@@ -31,7 +31,7 @@ func TestHostVolume(t *testing.T) {
StateMgr: client.stateDB,
UpdateNodeVols: client.updateNodeFromHostVol,
PluginDir: "/no/ext/plugins",
SharedMountDir: tmp,
VolumesDir: tmp,
})
client.hostVolumeManager = manager
hostPathCreate := filepath.Join(tmp, "test-vol-id-1")
@@ -177,7 +177,7 @@ func TestHostVolume(t *testing.T) {
StateMgr: client.stateDB,
UpdateNodeVols: client.updateNodeFromHostVol,
PluginDir: "/no/ext/plugins",
SharedMountDir: "host_volume_endpoint_test.go",
VolumesDir: "host_volume_endpoint_test.go",
})
req := &cstructs.ClientHostVolumeCreateRequest{

View File

@@ -23,14 +23,18 @@ import (
const (
// environment variables for external plugins
EnvOperation = "DHV_OPERATION"
EnvHostPath = "DHV_HOST_PATH"
EnvNodeID = "DHV_NODE_ID"
EnvVolumesDir = "DHV_VOLUMES_DIR"
EnvPluginDir = "DHV_PLUGIN_DIR"
EnvCreatedPath = "DHV_CREATED_PATH"
EnvNamespace = "DHV_NAMESPACE"
EnvVolumeName = "DHV_VOLUME_NAME"
EnvVolumeID = "DHV_VOLUME_ID"
EnvNodeID = "DHV_NODE_ID"
EnvNodePool = "DHV_NODE_POOL"
EnvCapacityMin = "DHV_CAPACITY_MIN_BYTES"
EnvCapacityMax = "DHV_CAPACITY_MAX_BYTES"
EnvPluginDir = "DHV_PLUGIN_DIR"
EnvParameters = "DHV_PARAMETERS"
)
@@ -62,10 +66,10 @@ const HostVolumePluginMkdirVersion = "0.0.1"
var _ HostVolumePlugin = &HostVolumePluginMkdir{}
// HostVolumePluginMkdir is a plugin that creates a directory within the
// specified TargetPath. It is built-in to Nomad, so is always available.
// specified VolumesDir. It is built-in to Nomad, so is always available.
type HostVolumePluginMkdir struct {
ID string
TargetPath string
VolumesDir string
log hclog.Logger
}
@@ -80,7 +84,7 @@ func (p *HostVolumePluginMkdir) Fingerprint(_ context.Context) (*PluginFingerpri
func (p *HostVolumePluginMkdir) Create(_ context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) {
path := filepath.Join(p.TargetPath, req.ID)
path := filepath.Join(p.VolumesDir, req.ID)
log := p.log.With(
"operation", "create",
"volume_id", req.ID,
@@ -102,7 +106,7 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context,
return nil, err
}
err := os.Mkdir(path, 0o700)
err := os.MkdirAll(path, 0o700)
if err != nil {
log.Debug("error with plugin", "error", err)
return nil, err
@@ -113,7 +117,7 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context,
}
func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error {
path := filepath.Join(p.TargetPath, req.ID)
path := filepath.Join(p.VolumesDir, req.ID)
log := p.log.With(
"operation", "delete",
"volume_id", req.ID,
@@ -135,7 +139,7 @@ var _ HostVolumePlugin = &HostVolumePluginExternal{}
// NewHostVolumePluginExternal returns an external host volume plugin
// if the specified executable exists on disk.
func NewHostVolumePluginExternal(log hclog.Logger,
pluginDir, filename, targetPath string) (*HostVolumePluginExternal, error) {
pluginDir, filename, volumesDir, nodePool string) (*HostVolumePluginExternal, error) {
// this should only be called with already-detected executables,
// but we'll double-check it anyway, so we can provide a tidy error message
// if it has changed between fingerprinting and execution.
@@ -153,8 +157,9 @@ func NewHostVolumePluginExternal(log hclog.Logger,
return &HostVolumePluginExternal{
ID: filename,
Executable: executable,
TargetPath: targetPath,
VolumesDir: volumesDir,
PluginDir: pluginDir,
NodePool: nodePool,
log: log,
}, nil
}
@@ -166,16 +171,17 @@ func NewHostVolumePluginExternal(log hclog.Logger,
type HostVolumePluginExternal struct {
ID string
Executable string
TargetPath string
VolumesDir string
PluginDir string
NodePool string
log hclog.Logger
}
// Fingerprint calls the executable with the following parameters:
// arguments: fingerprint
// arguments: $1=fingerprint
// environment:
// DHV_OPERATION=fingerprint
// - DHV_OPERATION=fingerprint
//
// Response should be valid JSON on stdout, with a "version" key, e.g.:
// {"version": "0.0.1"}
@@ -205,21 +211,23 @@ func (p *HostVolumePluginExternal) Fingerprint(ctx context.Context) (*PluginFing
}
// Create calls the executable with the following parameters:
// arguments: create {path to create}
// arguments: $1=create
// environment:
// DHV_OPERATION=create
// DHV_HOST_PATH={path to create}
// DHV_NODE_ID={Nomad node ID}
// DHV_VOLUME_NAME={name from the volume specification}
// DHV_VOLUME_ID={Nomad volume ID}
// DHV_CAPACITY_MIN_BYTES={capacity_min from the volume spec}
// DHV_CAPACITY_MAX_BYTES={capacity_max from the volume spec}
// DHV_PARAMETERS={json of parameters from the volume spec}
// DHV_PLUGIN_DIR={path to directory containing plugins}
// - DHV_OPERATION=create
// - DHV_VOLUMES_DIR={directory to put the volume in}
// - DHV_PLUGIN_DIR={path to directory containing plugins}
// - DHV_NAMESPACE={volume namespace}
// - DHV_VOLUME_NAME={name from the volume specification}
// - DHV_VOLUME_ID={volume ID generated by Nomad}
// - DHV_NODE_ID={Nomad node ID}
// - DHV_NODE_POOL={Nomad node pool}
// - DHV_CAPACITY_MIN_BYTES={capacity_min from the volume spec, expressed in bytes}
// - DHV_CAPACITY_MAX_BYTES={capacity_max from the volume spec, expressed in bytes}
// - DHV_PARAMETERS={stringified json of parameters from the volume spec}
//
// Response should be valid JSON on stdout with "path" and "bytes", e.g.:
// {"path": $HOST_PATH, "bytes": 50000000}
// "path" must be provided to confirm that the requested path is what was
// {"path": "/path/that/was/created", "bytes": 50000000}
// "path" must be provided to confirm the requested path is what was
// created by the plugin. "bytes" is the actual size of the volume created
// by the plugin; if excluded, it will default to 0.
//
@@ -233,14 +241,22 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context,
return nil, fmt.Errorf("error marshaling volume pramaters: %w", err)
}
envVars := []string{
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
fmt.Sprintf("%s=%s", EnvOperation, "create"),
fmt.Sprintf("%s=%s", EnvVolumesDir, p.VolumesDir),
fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir),
fmt.Sprintf("%s=%s", EnvNodePool, p.NodePool),
// values from volume spec
fmt.Sprintf("%s=%s", EnvNamespace, req.Namespace),
fmt.Sprintf("%s=%s", EnvVolumeName, req.Name),
fmt.Sprintf("%s=%s", EnvVolumeID, req.ID),
fmt.Sprintf("%s=%d", EnvCapacityMin, req.RequestedCapacityMinBytes),
fmt.Sprintf("%s=%d", EnvCapacityMax, req.RequestedCapacityMaxBytes),
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
fmt.Sprintf("%s=%s", EnvParameters, params),
}
stdout, _, err := p.runPlugin(ctx, "create", req.ID, envVars)
log := p.log.With("volume_name", req.Name, "volume_id", req.ID)
stdout, _, err := p.runPlugin(ctx, log, "create", envVars)
if err != nil {
return nil, fmt.Errorf("error creating volume %q with plugin %q: %w", req.ID, p.ID, err)
}
@@ -253,20 +269,22 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context,
// an error here after the plugin has done who-knows-what.
return nil, err
}
// TODO: validate returned host path
return &pluginResp, nil
}
// Delete calls the executable with the following parameters:
// arguments: delete {path to create}
// arguments: $1=delete
// environment:
// DHV_OPERATION=delete
// DHV_HOST_PATH={path to create}
// DHV_NODE_ID={Nomad node ID}
// DHV_VOLUME_NAME={name from the volume specification}
// DHV_VOLUME_ID={Nomad volume ID}
// DHV_PARAMETERS={json of parameters from the volume spec}
// DHV_PLUGIN_DIR={path to directory containing plugins}
// - DHV_OPERATION=delete
// - DHV_CREATED_PATH={path that `create` returned}
// - DHV_VOLUMES_DIR={directory that volumes should be put in}
// - DHV_PLUGIN_DIR={path to directory containing plugins}
// - DHV_NAMESPACE={volume namespace}
// - DHV_VOLUME_NAME={name from the volume specification}
// - DHV_VOLUME_ID={volume ID generated by Nomad}
// - DHV_NODE_ID={Nomad node ID}
// - DHV_NODE_POOL={Nomad node pool}
// - DHV_PARAMETERS={stringified json of parameters from the volume spec}
//
// Response on stdout is discarded.
//
@@ -280,42 +298,38 @@ func (p *HostVolumePluginExternal) Delete(ctx context.Context,
return fmt.Errorf("error marshaling volume pramaters: %w", err)
}
envVars := []string{
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
fmt.Sprintf("%s=%s", EnvOperation, "delete"),
fmt.Sprintf("%s=%s", EnvVolumesDir, p.VolumesDir),
fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir),
fmt.Sprintf("%s=%s", EnvNodePool, p.NodePool),
// from create response
fmt.Sprintf("%s=%s", EnvCreatedPath, req.HostPath),
// values from volume spec
fmt.Sprintf("%s=%s", EnvNamespace, req.Namespace),
fmt.Sprintf("%s=%s", EnvVolumeName, req.Name),
fmt.Sprintf("%s=%s", EnvVolumeID, req.ID),
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
fmt.Sprintf("%s=%s", EnvParameters, params),
}
_, _, err = p.runPlugin(ctx, "delete", req.ID, envVars)
log := p.log.With("volume_name", req.Name, "volume_id", req.ID)
_, _, err = p.runPlugin(ctx, log, "delete", envVars)
if err != nil {
return fmt.Errorf("error deleting volume %q with plugin %q: %w", req.ID, p.ID, err)
}
return nil
}
// runPlugin executes the... executable with these additional env vars:
// DHV_OPERATION={op}
// DHV_HOST_PATH={path to create}
// DHV_VOLUME_ID={Nomad volume ID}
// DHV_PLUGIN_DIR={path to directory containing plugins}
func (p *HostVolumePluginExternal) runPlugin(ctx context.Context,
op, volID string, env []string) (stdout, stderr []byte, err error) {
// runPlugin executes the... executable
func (p *HostVolumePluginExternal) runPlugin(ctx context.Context, log hclog.Logger,
op string, env []string) (stdout, stderr []byte, err error) {
path := filepath.Join(p.TargetPath, volID)
log := p.log.With(
"operation", op,
"volume_id", volID,
"path", path)
log = log.With("operation", op)
log.Debug("running plugin")
// set up plugin execution
cmd := exec.CommandContext(ctx, p.Executable, op, path)
cmd.Env = append([]string{
fmt.Sprintf("%s=%s", EnvOperation, op),
fmt.Sprintf("%s=%s", EnvHostPath, path),
fmt.Sprintf("%s=%s", EnvVolumeID, volID),
fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir),
}, env...)
cmd := exec.CommandContext(ctx, p.Executable, op)
cmd.Env = env
stdout, stderr, err = runCommand(cmd)

View File

@@ -21,7 +21,7 @@ func TestHostVolumePluginMkdir(t *testing.T) {
plug := &HostVolumePluginMkdir{
ID: "test-mkdir-plugin",
TargetPath: tmp,
VolumesDir: tmp,
log: testlog.HCLogger(t),
}
@@ -58,7 +58,7 @@ func TestHostVolumePluginMkdir(t *testing.T) {
t.Run("sad", func(t *testing.T) {
// can't mkdir inside a file
plug.TargetPath = "host_volume_plugin_test.go"
plug.VolumesDir = "host_volume_plugin_test.go"
resp, err := plug.Create(timeout(t),
&cstructs.ClientHostVolumeCreateRequest{
@@ -79,23 +79,25 @@ func TestNewHostVolumePluginExternal(t *testing.T) {
log := testlog.HCLogger(t)
var err error
_, err = NewHostVolumePluginExternal(log, ".", "non-existent", "target")
_, err = NewHostVolumePluginExternal(log, ".", "non-existent", "target", "")
must.ErrorIs(t, err, ErrPluginNotExists)
_, err = NewHostVolumePluginExternal(log, ".", "host_volume_plugin_test.go", "target")
_, err = NewHostVolumePluginExternal(log, ".", "host_volume_plugin_test.go", "target", "")
must.ErrorIs(t, err, ErrPluginNotExecutable)
t.Run("unix", func(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("skipped because windows") // db TODO(1.10.0)
}
p, err := NewHostVolumePluginExternal(log, "./test_fixtures", "test_plugin.sh", "test-target")
p, err := NewHostVolumePluginExternal(log,
"./test_fixtures", "test_plugin.sh", "test-target", "test-pool")
must.NoError(t, err)
must.Eq(t, &HostVolumePluginExternal{
ID: "test_plugin.sh",
Executable: "test_fixtures/test_plugin.sh",
TargetPath: "test-target",
VolumesDir: "test-target",
PluginDir: "./test_fixtures",
NodePool: "test-pool",
log: log,
}, p)
})
@@ -116,7 +118,8 @@ func TestHostVolumePluginExternal(t *testing.T) {
t.Run("happy", func(t *testing.T) {
log, getLogs := logRecorder(t)
plug, err := NewHostVolumePluginExternal(log, "./test_fixtures", "test_plugin.sh", tmp)
plug, err := NewHostVolumePluginExternal(log,
"./test_fixtures", "test_plugin.sh", tmp, "test-node-pool")
must.NoError(t, err)
// fingerprint
@@ -130,6 +133,7 @@ func TestHostVolumePluginExternal(t *testing.T) {
&cstructs.ClientHostVolumeCreateRequest{
Name: "test-vol-name",
ID: volID,
Namespace: "test-namespace",
NodeID: "test-node",
RequestedCapacityMinBytes: 5,
RequestedCapacityMaxBytes: 10,
@@ -151,6 +155,8 @@ func TestHostVolumePluginExternal(t *testing.T) {
&cstructs.ClientHostVolumeDeleteRequest{
Name: "test-vol-name",
ID: volID,
HostPath: resp.Path,
Namespace: "test-namespace",
NodeID: "test-node",
Parameters: map[string]string{"key": "val"},
})
@@ -164,7 +170,7 @@ func TestHostVolumePluginExternal(t *testing.T) {
t.Run("sad", func(t *testing.T) {
log, getLogs := logRecorder(t)
plug, err := NewHostVolumePluginExternal(log, "./test_fixtures", "test_plugin_sad.sh", tmp)
plug, err := NewHostVolumePluginExternal(log, "./test_fixtures", "test_plugin_sad.sh", tmp, "")
must.NoError(t, err)
v, err := plug.Fingerprint(timeout(t))
@@ -180,11 +186,7 @@ func TestHostVolumePluginExternal(t *testing.T) {
resp, err := plug.Create(timeout(t),
&cstructs.ClientHostVolumeCreateRequest{
ID: volID,
NodeID: "test-node",
RequestedCapacityMinBytes: 5,
RequestedCapacityMaxBytes: 10,
Parameters: map[string]string{"key": "val"},
ID: volID,
})
must.EqError(t, err, `error creating volume "test-vol-id" with plugin "test_plugin_sad.sh": exit status 1`)
must.Nil(t, resp)
@@ -197,9 +199,7 @@ func TestHostVolumePluginExternal(t *testing.T) {
err = plug.Delete(timeout(t),
&cstructs.ClientHostVolumeDeleteRequest{
ID: volID,
NodeID: "test-node",
Parameters: map[string]string{"key": "val"},
ID: volID,
})
must.EqError(t, err, `error deleting volume "test-vol-id" with plugin "test_plugin_sad.sh": exit status 1`)
logged = getLogs()

View File

@@ -35,9 +35,12 @@ type Config struct {
// PluginDir is where external plugins may be found.
PluginDir string
// SharedMountDir is where plugins should place the directory
// that will later become a volume HostPath
SharedMountDir string
// VolumesDir is where plugins should place the directory
// that will later become a volume's HostPath
VolumesDir string
// NodePool is passed into external plugin execution environment.
NodePool string
// StateMgr manages client state to restore on agent restarts.
StateMgr HostVolumeStateManager
@@ -51,7 +54,8 @@ type Config struct {
// and registers volumes with the client node.
type HostVolumeManager struct {
pluginDir string
sharedMountDir string
volumesDir string
nodePool string
stateMgr HostVolumeStateManager
updateNodeVols HostVolumeNodeUpdater
builtIns map[string]HostVolumePlugin
@@ -64,13 +68,14 @@ func NewHostVolumeManager(logger hclog.Logger, config Config) *HostVolumeManager
logger = logger.Named("host_volume_manager")
return &HostVolumeManager{
pluginDir: config.PluginDir,
sharedMountDir: config.SharedMountDir,
volumesDir: config.VolumesDir,
nodePool: config.NodePool,
stateMgr: config.StateMgr,
updateNodeVols: config.UpdateNodeVols,
builtIns: map[string]HostVolumePlugin{
HostVolumePluginMkdirID: &HostVolumePluginMkdir{
ID: HostVolumePluginMkdirID,
TargetPath: config.SharedMountDir,
VolumesDir: config.VolumesDir,
log: logger.With("plugin_id", HostVolumePluginMkdirID),
},
},
@@ -84,13 +89,16 @@ func NewHostVolumeManager(logger hclog.Logger, config Config) *HostVolumeManager
func (hvm *HostVolumeManager) Create(ctx context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*cstructs.ClientHostVolumeCreateResponse, error) {
log := hvm.log.With("volume_name", req.Name, "volume_id", req.ID)
plug, err := hvm.getPlugin(req.PluginID)
if err != nil {
return nil, err
}
// can't have two of the same volume name w/ different IDs per client node
if err := hvm.locker.lock(req.Name, req.ID); err != nil {
isNewVolume, err := hvm.locker.lock(req.Name, req.ID)
if err != nil {
return nil, err
}
@@ -106,22 +114,26 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
CreateReq: req,
}
if err := hvm.stateMgr.PutDynamicHostVolume(volState); err != nil {
// if we fail to write to state, delete the volume so it isn't left
// lying around without Nomad knowing about it.
hvm.log.Error("failed to save volume in state, so deleting", "volume_id", req.ID, "error", err)
delErr := plug.Delete(ctx, &cstructs.ClientHostVolumeDeleteRequest{
ID: req.ID,
PluginID: req.PluginID,
NodeID: req.NodeID,
HostPath: hvm.sharedMountDir,
Parameters: req.Parameters,
})
if delErr != nil {
hvm.log.Warn("error deleting volume after state store failure", "volume_id", req.ID, "error", delErr)
err = multierror.Append(err, delErr)
// if we fail to write to state on initial create,
// delete the volume so it isn't left lying around
// without Nomad knowing about it.
log.Error("failed to save volume in client state", "error", err)
if isNewVolume {
log.Error("initial create detected, running delete")
delErr := plug.Delete(ctx, &cstructs.ClientHostVolumeDeleteRequest{
ID: req.ID,
PluginID: req.PluginID,
NodeID: req.NodeID,
HostPath: hvm.volumesDir,
Parameters: req.Parameters,
})
if delErr != nil {
log.Warn("error deleting volume after state store failure", "error", delErr)
err = multierror.Append(err, delErr)
}
// free up the volume name whether delete succeeded or not.
hvm.locker.release(req.Name)
}
// free up the volume name whether delete succeeded or not.
hvm.locker.release(req.Name)
return nil, err
}
@@ -142,7 +154,7 @@ func (hvm *HostVolumeManager) Register(ctx context.Context,
req *cstructs.ClientHostVolumeRegisterRequest) error {
// can't have two of the same volume name w/ different IDs per client node
if err := hvm.locker.lock(req.Name, req.ID); err != nil {
if _, err := hvm.locker.lock(req.Name, req.ID); err != nil {
return err
}
@@ -216,7 +228,7 @@ func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) {
return plug, nil
}
log := hvm.log.With("plugin_id", id)
return NewHostVolumePluginExternal(log, hvm.pluginDir, id, hvm.sharedMountDir)
return NewHostVolumePluginExternal(log, hvm.pluginDir, id, hvm.volumesDir, hvm.nodePool)
}
// restoreFromState loads all volumes from client state and runs Create for
@@ -270,7 +282,7 @@ func (hvm *HostVolumeManager) restoreForCreate(ctx context.Context, vol *cstruct
}
// lock the name so future creates can't produce duplicates.
err = hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID)
_, err = hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID)
// state should never have duplicate vol names, and restore happens
// prior to node registration, so new creates shouldn't come in
// concurrently, but check for error just in case.
@@ -299,7 +311,7 @@ func (hvm *HostVolumeManager) restoreForCreate(ctx context.Context, vol *cstruct
// Register, by converting the stored struct. It otherwise behaves the same as
// restoreForCreate.
func (hvm *HostVolumeManager) restoreForRegister(vol *cstructs.HostVolumeState) (*structs.ClientHostVolumeConfig, error) {
err := hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID)
_, err := hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID)
if err != nil {
hvm.log.Error("error during restore",
"volume_name", vol.CreateReq.Name,
@@ -340,20 +352,20 @@ type volLocker struct {
locks sync.Map
}
// lock the provided name, error if it was already locked with a different ID
func (l *volLocker) lock(name, id string) error {
// lock the provided name, return true if it was not already locked,
// and error if it was already locked with a different ID.
func (l *volLocker) lock(name, id string) (bool, error) {
current, exists := l.locks.LoadOrStore(name, id)
if exists && id != current.(string) {
return ErrVolumeNameExists
return false, ErrVolumeNameExists
}
return nil
return !exists, nil
}
func (l *volLocker) release(name string) {
l.locks.Delete(name)
}
// only used in tests to assert lock state
func (l *volLocker) isLocked(name string) bool {
_, locked := l.locks.Load(name)
return locked

View File

@@ -29,17 +29,17 @@ func TestHostVolumeManager(t *testing.T) {
memDB := cstate.NewMemDB(log)
node := newFakeNode(t)
hostPathCreate := t.TempDir()
hostPathRegister := t.TempDir()
volumesDirCreate := t.TempDir()
volumesDirRegister := t.TempDir()
hvm := NewHostVolumeManager(log, Config{
PluginDir: "./test_fixtures",
SharedMountDir: hostPathCreate,
VolumesDir: volumesDirCreate,
StateMgr: errDB,
UpdateNodeVols: node.updateVol,
})
plug := &fakePlugin{mountDir: hostPathCreate}
plug := &fakePlugin{volsDir: volumesDirCreate}
hvm.builtIns["test-plugin"] = plug
ctx := timeout(t)
@@ -87,7 +87,7 @@ func TestHostVolumeManager(t *testing.T) {
expectResp := &cstructs.ClientHostVolumeCreateResponse{
VolumeName: "created-volume",
VolumeID: "vol-id-1",
HostPath: hostPathCreate,
HostPath: filepath.Join(volumesDirCreate, "vol-id-1"),
CapacityBytes: 5,
}
must.Eq(t, expectResp, resp)
@@ -108,6 +108,15 @@ func TestHostVolumeManager(t *testing.T) {
must.NoError(t, err)
must.Eq(t, expectResp, resp)
// error saving state on restore/update should not run delete
hvm.stateMgr = errDB
resp, err = hvm.Create(ctx, req)
must.ErrorIs(t, err, cstate.ErrDBError)
must.Nil(t, resp)
must.Eq(t, "", plug.deleted)
plug.reset()
hvm.stateMgr = memDB
// duplicate create with the same vol name but different ID should fail
_, err = hvm.Create(ctx, &cstructs.ClientHostVolumeCreateRequest{
Name: name,
@@ -122,7 +131,7 @@ func TestHostVolumeManager(t *testing.T) {
req := &cstructs.ClientHostVolumeRegisterRequest{
ID: "vol-id-2",
Name: name,
HostPath: hostPathRegister,
HostPath: volumesDirRegister,
CapacityBytes: 1000,
}
err := hvm.Register(ctx, req)
@@ -182,7 +191,7 @@ func TestHostVolumeManager(t *testing.T) {
must.Eq(t, VolumeMap{
"registered-volume": &structs.ClientHostVolumeConfig{
Name: "registered-volume",
Path: hostPathRegister,
Path: volumesDirRegister,
ID: "vol-id-2",
},
}, node.vols, must.Sprint("created-volume should be deleted from node"))
@@ -208,7 +217,7 @@ func TestHostVolumeManager(t *testing.T) {
}
type fakePlugin struct {
mountDir string
volsDir string
created string
deleted string
fingerprintErr error
@@ -236,7 +245,7 @@ func (p *fakePlugin) Create(_ context.Context, req *cstructs.ClientHostVolumeCre
}
p.created = req.ID
return &HostVolumePluginCreateResponse{
Path: p.mountDir,
Path: filepath.Join(p.volsDir, req.ID),
SizeBytes: req.RequestedCapacityMinBytes,
}, nil
}
@@ -302,14 +311,14 @@ func TestHostVolumeManager_restoreFromState(t *testing.T) {
// new volume manager should load it from state and run Create,
// resulting in a volume directory in this mountDir.
mountDir := t.TempDir()
volPath := filepath.Join(mountDir, vol1.ID)
volsDir := t.TempDir()
volPath := filepath.Join(volsDir, vol1.ID)
hvm := NewHostVolumeManager(log, Config{
StateMgr: state,
UpdateNodeVols: node.updateVol,
PluginDir: "/wherever",
SharedMountDir: mountDir,
VolumesDir: volsDir,
})
vols, err := hvm.restoreFromState(timeout(t))

View File

@@ -11,41 +11,39 @@ test "$1" == "$DHV_OPERATION"
echo 'all operations should ignore stderr' 1>&2
# since we will be running `rm -rf` (frightening),
# check to make sure DHV_HOST_PATH has a uuid shape in it.
# Nomad generates a volume ID and includes it in the path.
validate_path() {
if [[ ! "$DHV_HOST_PATH" =~ "test-vol-id" ]]; then
1>&2 echo "expected test vol ID in the DHV_HOST_PATH; got: '$DHV_HOST_PATH'"
exit 1
fi
}
case $1 in
fingerprint)
echo '{"version": "0.0.2"}' ;;
create)
test "$2" == "$DHV_HOST_PATH"
test "$DHV_NODE_ID" == 'test-node'
test "$DHV_VOLUME_NAME" == 'test-vol-name'
test "$DHV_VOLUME_ID" == 'test-vol-id'
test "$DHV_PARAMETERS" == '{"key":"val"}'
test "$DHV_NAMESPACE" == 'test-namespace'
test "$DHV_CAPACITY_MIN_BYTES" -eq 5
test "$DHV_CAPACITY_MAX_BYTES" -eq 10
test "$DHV_NODE_ID" == 'test-node'
test "$DHV_NODE_POOL" == 'test-node-pool'
test "$DHV_PARAMETERS" == '{"key":"val"}'
test "$DHV_PLUGIN_DIR" == './test_fixtures'
validate_path "$DHV_HOST_PATH"
mkdir "$2"
printf '{"path": "%s", "bytes": 5, "context": %s}' "$2" "$DHV_PARAMETERS"
test -d "$DHV_VOLUMES_DIR"
target="$DHV_VOLUMES_DIR/$DHV_VOLUME_ID"
test "$target" != '/'
mkdir -p "$target"
printf '{"path": "%s", "bytes": 5}' "$target"
;;
delete)
test "$2" == "$DHV_HOST_PATH"
test "$DHV_NODE_ID" == 'test-node'
test "$DHV_NODE_POOL" == 'test-node-pool'
test "$DHV_NAMESPACE" == 'test-namespace'
test "$DHV_VOLUME_NAME" == 'test-vol-name'
test "$DHV_VOLUME_ID" == 'test-vol-id'
test "$DHV_PARAMETERS" == '{"key":"val"}'
test "$DHV_PLUGIN_DIR" == './test_fixtures'
validate_path "$DHV_HOST_PATH"
rm -rfv "$2" ;;
test -d "$DHV_VOLUMES_DIR"
target="$DHV_VOLUMES_DIR/$DHV_VOLUME_ID"
test "$target" != '/'
test "$DHV_CREATED_PATH" == "$target"
rm -rfv "$target"
;;
*)
echo "unknown operation $1"
exit 1 ;;

View File

@@ -4,6 +4,7 @@
package hostvolumemanager
import (
"path/filepath"
"testing"
"github.com/hashicorp/nomad/client/state"
@@ -106,11 +107,11 @@ func TestWaitForFirstFingerprint(t *testing.T) {
node := newFakeNode(t)
hvm := NewHostVolumeManager(log, Config{
PluginDir: "",
SharedMountDir: tmp,
VolumesDir: tmp,
StateMgr: memDB,
UpdateNodeVols: node.updateVol,
})
plug := &fakePlugin{mountDir: tmp}
plug := &fakePlugin{volsDir: tmp}
hvm.builtIns = map[string]HostVolumePlugin{
"test-plugin": plug,
}
@@ -136,7 +137,7 @@ func TestWaitForFirstFingerprint(t *testing.T) {
"vol-name": &structs.ClientHostVolumeConfig{
Name: "vol-name",
ID: "vol-id",
Path: tmp,
Path: filepath.Join(tmp, "vol-id"),
ReadOnly: false,
},
}, node.vols)

View File

@@ -22,6 +22,10 @@ type ClientHostVolumeCreateRequest struct {
// built-in plugin.
PluginID string
// Namespace is the Nomad namespace for the volume.
// It's in the client RPC to be included in plugin execution environment.
Namespace string
// NodeID is the node where the volume is placed. It's included in the
// client RPC request so that the server can route the request to the
// correct node.
@@ -88,13 +92,17 @@ type ClientHostVolumeDeleteRequest struct {
// built-in plugin.
PluginID string
// Namespace is the Nomad namespace for the volume.
// It's in the client RPC to be included in plugin execution environment.
Namespace string
// NodeID is the node where the volume is placed. It's included in the
// client RPC request so that the server can route the request to the
// correct node.
NodeID string
// Path is the host path where the volume's mount point was created. We send
// this from the server to allow verification by plugins
// HostPath is the host path where the volume's mount point was created.
// We send this from the server to allow verification by plugins.
HostPath string
// Parameters are an opaque map of parameters for the host volume plugin.

View File

@@ -724,6 +724,7 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) {
if agentConfig.DataDir != "" {
conf.StateDir = filepath.Join(agentConfig.DataDir, "client")
conf.AllocDir = filepath.Join(agentConfig.DataDir, "alloc")
conf.HostVolumesDir = filepath.Join(agentConfig.DataDir, "host_volumes")
conf.HostVolumePluginDir = filepath.Join(agentConfig.DataDir, "host_volume_plugins")
dataParent := filepath.Dir(agentConfig.DataDir)
conf.AllocMountsDir = filepath.Join(dataParent, "alloc_mounts")
@@ -740,6 +741,9 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) {
if agentConfig.Client.HostVolumePluginDir != "" {
conf.HostVolumePluginDir = agentConfig.Client.HostVolumePluginDir
}
if agentConfig.Client.HostVolumesDir != "" {
conf.HostVolumesDir = agentConfig.Client.HostVolumesDir
}
if agentConfig.Client.NetworkInterface != "" {
conf.NetworkInterface = agentConfig.Client.NetworkInterface
}

View File

@@ -108,6 +108,7 @@ func (c *Command) readConfig() *Config {
flags.StringVar(&cmdConfig.Client.StateDir, "state-dir", "", "")
flags.StringVar(&cmdConfig.Client.AllocDir, "alloc-dir", "", "")
flags.StringVar(&cmdConfig.Client.AllocMountsDir, "alloc-mounts-dir", "", "")
flags.StringVar(&cmdConfig.Client.HostVolumesDir, "host-volumes-dir", "", "")
flags.StringVar(&cmdConfig.Client.HostVolumePluginDir, "host-volume-plugin-dir", "", "")
flags.StringVar(&cmdConfig.Client.NodeClass, "node-class", "", "")
flags.StringVar(&cmdConfig.Client.NodePool, "node-pool", "", "")
@@ -386,6 +387,7 @@ func (c *Command) IsValidConfig(config, cmdConfig *Config) bool {
"plugin-dir": config.PluginDir,
"alloc-dir": config.Client.AllocDir,
"alloc-mounts-dir": config.Client.AllocMountsDir,
"host-volumes-dir": config.Client.HostVolumesDir,
"host-volume-plugin-dir": config.Client.HostVolumePluginDir,
"state-dir": config.Client.StateDir,
}
@@ -1563,6 +1565,10 @@ Client Options:
The default speed for network interfaces in MBits if the link speed can not
be determined dynamically.
-host-volumes-dir
Directory wherein host volume plugins should place volumes. The default is
<data-dir>/host_volumes.
-host-volume-plugin-dir
Directory containing dynamic host volume plugins. The default is
<data-dir>/host_volume_plugins.

View File

@@ -229,6 +229,10 @@ type ClientConfig struct {
// AllocMountsDir is the directory for storing mounts into allocation data
AllocMountsDir string `hcl:"alloc_mounts_dir"`
// HostVolumesDir is the suggested directory for plugins to put volumes.
// Volume plugins may ignore this suggestion, but we provide this default.
HostVolumesDir string `hcl:"host_volumes_dir"`
// HostVolumePluginDir directory contains dynamic host volume plugins
HostVolumePluginDir string `hcl:"host_volume_plugin_dir"`
@@ -2319,6 +2323,9 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig {
if b.AllocMountsDir != "" {
result.AllocMountsDir = b.AllocMountsDir
}
if b.HostVolumesDir != "" {
result.HostVolumesDir = b.HostVolumesDir
}
if b.HostVolumePluginDir != "" {
result.HostVolumePluginDir = b.HostVolumePluginDir
}

View File

@@ -40,7 +40,7 @@ fi
plugin="$1"
op="$2"
alloc_mounts="${3:-/tmp}"
volumes_dir="${3:-/tmp}"
uuid="${4:-74564d17-ce50-0bc1-48e5-6feaa41ede48}"
node_id='0b62d807-6101-a80f-374d-e1c430abbf47'
plugin_dir="$(dirname "$plugin")"
@@ -52,9 +52,9 @@ case $op in
;;
create)
args="create $alloc_mounts/$uuid"
args='create'
export DHV_OPERATION='create'
export DHV_HOST_PATH="$alloc_mounts/$uuid"
export DHV_VOLUMES_DIR="$volumes_dir"
export DHV_VOLUME_NAME=test
export DHV_VOLUME_ID="$uuid"
export DHV_NODE_ID="$node_id"
@@ -65,9 +65,9 @@ case $op in
;;
delete)
args="delete $alloc_mounts/$uuid"
args='delete'
export DHV_OPERATION='delete'
export DHV_HOST_PATH="$alloc_mounts/$uuid"
export DHV_VOLUMES_DIR="$volumes_dir"
export DHV_NODE_ID="$node_id"
export DHV_VOLUME_NAME=test
export DHV_VOLUME_ID="$uuid"

View File

@@ -45,10 +45,6 @@ for arg in "$@"; do
esac
done
# path is required for everything else
[ $# -lt 2 ] && { echo 'path required; seek --help' 1>&2; exit 1; }
host_path="$2"
# OS detect
if [[ "$OSTYPE" == "linux-"* ]]; then
ext=ext4
@@ -83,7 +79,7 @@ fi
validate_path() {
local path="$1"
if [[ ! "$path" =~ [0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12} ]]; then
1>&2 echo "expected uuid-lookin ID in the HOST_PATH; got: '$path'"
1>&2 echo "expected uuid-lookin ID in target host path; got: '$path'"
return 1
fi
}
@@ -100,6 +96,7 @@ create_volume() {
# translate to mb for dd block size
local megs=$((bytes / 1024 / 1024)) # lazy, approximate
mkdir -p "$(dirname "$path")"
# the extra conditionals are for idempotency
if [ ! -f "$path.$ext" ]; then
mkfsExec "$path" $megs
@@ -118,6 +115,7 @@ delete_volume() {
rm -f "$path"."$ext"
}
host_path="$DHV_VOLUMES_DIR/$DHV_VOLUME_ID"
case "$1" in
"create")
create_volume "$host_path" "$DHV_CAPACITY_MIN_BYTES"

View File

@@ -463,6 +463,7 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error {
ID: vol.ID,
Name: vol.Name,
PluginID: vol.PluginID,
Namespace: vol.Namespace,
NodeID: vol.NodeID,
RequestedCapacityMinBytes: vol.RequestedCapacityMinBytes,
RequestedCapacityMaxBytes: vol.RequestedCapacityMaxBytes,
@@ -696,6 +697,7 @@ func (v *HostVolume) deleteVolume(vol *structs.HostVolume) error {
ID: vol.ID,
Name: vol.Name,
PluginID: vol.PluginID,
Namespace: vol.Namespace,
NodeID: vol.NodeID,
HostPath: vol.HostPath,
Parameters: vol.Parameters,

View File

@@ -206,6 +206,18 @@ client {
- `host_volume` <code>([host_volume](#host_volume-block): nil)</code> - Exposes
paths from the host as volumes that can be mounted into jobs.
- `host_volumes_dir` `(string: "")` - Specifies the directory wherein
host volume plugins should place volumes. When this parameter is empty, Nomad
generates the path using the [top-level `data_dir`][top_level_data_dir]
suffixed with `host_volumes`, like `"/opt/nomad/host_volumes"`.
This must be an absolute path.
- `host_volume_plugin_dir` `(string: "")` - Specifies the directory to find host
volume plugins. When this parameter is empty, Nomad generates the path
using the [top-level `data_dir`][top_level_data_dir] suffixed with
`host_volume_plugins`, like `"/opt/nomad/host_volume_plugins"`. This must be
an absolute path.
- `host_network` <code>([host_network](#host_network-block): nil)</code> - Registers
additional host networks with the node that can be selected when port mapping.