From 49c147bcd7266b6b8995a9d5eebe97cb562d3e9c Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Mon, 27 Jan 2025 11:36:53 -0500 Subject: [PATCH] dynamic host volumes: change env vars, fixup auto-delete (#24943) * plugin env: DHV_HOST_PATH->DHV_VOLUMES_DIR * client config: host_volumes_dir * plugin env: add namespace+nodepool * only auto-delete after error saving client state on *initial* create --- client/client.go | 12 +- client/config/config.go | 4 + client/fingerprint/dynamic_host_volumes.go | 12 +- client/host_volume_endpoint_test.go | 4 +- .../hostvolumemanager/host_volume_plugin.go | 130 ++++++++++-------- .../host_volume_plugin_test.go | 32 ++--- client/hostvolumemanager/host_volumes.go | 74 +++++----- client/hostvolumemanager/host_volumes_test.go | 33 +++-- .../test_fixtures/test_plugin.sh | 36 +++-- .../volume_fingerprint_test.go | 7 +- client/structs/host_volumes.go | 12 +- command/agent/agent.go | 4 + command/agent/command.go | 6 + command/agent/config.go | 7 + demo/hostvolume/_test-plugin.sh | 10 +- demo/hostvolume/example-plugin-mkfs | 8 +- nomad/host_volume_endpoint.go | 2 + website/content/docs/configuration/client.mdx | 12 ++ 18 files changed, 244 insertions(+), 161 deletions(-) diff --git a/client/client.go b/client/client.go index a83667fa3..57361320f 100644 --- a/client/client.go +++ b/client/client.go @@ -539,8 +539,9 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie // set up dynamic host volume manager c.hostVolumeManager = hvm.NewHostVolumeManager(logger, hvm.Config{ - PluginDir: cfg.HostVolumePluginDir, - SharedMountDir: cfg.AllocMountsDir, + PluginDir: c.GetConfig().HostVolumePluginDir, + VolumesDir: c.GetConfig().HostVolumesDir, + NodePool: c.Node().NodePool, StateMgr: c.stateDB, UpdateNodeVols: c.batchNodeUpdates.updateNodeFromHostVolume, }) @@ -702,6 +703,13 @@ func (c *Client) init() error { c.stateDB = db + // Ensure host_volumes_dir config is not empty. + if conf.HostVolumesDir == "" { + conf = c.UpdateConfig(func(c *config.Config) { + c.HostVolumesDir = filepath.Join(conf.StateDir, "host_volumes") + }) + } + // Ensure the alloc mounts dir exists if we are configured with a custom path. if conf.AllocMountsDir != "" { if err := os.MkdirAll(conf.AllocMountsDir, 0o711); err != nil { diff --git a/client/config/config.go b/client/config/config.go index 1fe177432..700d8feb8 100644 --- a/client/config/config.go +++ b/client/config/config.go @@ -314,6 +314,10 @@ type Config struct { // HostVolumes is a map of the configured host volumes by name. HostVolumes map[string]*structs.ClientHostVolumeConfig + // HostVolumesDir is the suggested directory for plugins to put volumes. + // Volume plugins may ignore this suggestion, but we provide this default. + HostVolumesDir string + // HostVolumePluginDir is the directory with dynamic host volume plugins. HostVolumePluginDir string diff --git a/client/fingerprint/dynamic_host_volumes.go b/client/fingerprint/dynamic_host_volumes.go index b062572f8..290733ce8 100644 --- a/client/fingerprint/dynamic_host_volumes.go +++ b/client/fingerprint/dynamic_host_volumes.go @@ -44,7 +44,7 @@ func (h *DynamicHostVolumePluginFingerprint) Fingerprint(request *FingerprintReq return nil } - plugins, err := GetHostVolumePluginVersions(h.logger, pluginDir) + plugins, err := GetHostVolumePluginVersions(h.logger, pluginDir, request.Node.NodePool) if err != nil { if os.IsNotExist(err) { h.logger.Debug("plugin dir does not exist", "dir", pluginDir) @@ -74,10 +74,10 @@ func (h *DynamicHostVolumePluginFingerprint) Periodic() (bool, time.Duration) { return false, 0 } -// GetHostVolumePluginVersions finds all the executable files on disk -// that respond to a Version call (arg $1 = 'version' / env $OPERATION = 'version') -// The return map's keys are plugin IDs, and the values are version strings. -func GetHostVolumePluginVersions(log hclog.Logger, pluginDir string) (map[string]string, error) { +// GetHostVolumePluginVersions finds all the executable files on disk that +// respond to a `fingerprint` call. The return map's keys are plugin IDs, +// and the values are version strings. +func GetHostVolumePluginVersions(log hclog.Logger, pluginDir, nodePool string) (map[string]string, error) { files, err := helper.FindExecutableFiles(pluginDir) if err != nil { return nil, err @@ -97,7 +97,7 @@ func GetHostVolumePluginVersions(log hclog.Logger, pluginDir string) (map[string log := log.With("plugin_id", file) - p, err := hvm.NewHostVolumePluginExternal(log, pluginDir, file, "") + p, err := hvm.NewHostVolumePluginExternal(log, pluginDir, file, "", nodePool) if err != nil { log.Warn("error getting plugin", "error", err) return diff --git a/client/host_volume_endpoint_test.go b/client/host_volume_endpoint_test.go index 71d8082a4..a1005e210 100644 --- a/client/host_volume_endpoint_test.go +++ b/client/host_volume_endpoint_test.go @@ -31,7 +31,7 @@ func TestHostVolume(t *testing.T) { StateMgr: client.stateDB, UpdateNodeVols: client.updateNodeFromHostVol, PluginDir: "/no/ext/plugins", - SharedMountDir: tmp, + VolumesDir: tmp, }) client.hostVolumeManager = manager hostPathCreate := filepath.Join(tmp, "test-vol-id-1") @@ -177,7 +177,7 @@ func TestHostVolume(t *testing.T) { StateMgr: client.stateDB, UpdateNodeVols: client.updateNodeFromHostVol, PluginDir: "/no/ext/plugins", - SharedMountDir: "host_volume_endpoint_test.go", + VolumesDir: "host_volume_endpoint_test.go", }) req := &cstructs.ClientHostVolumeCreateRequest{ diff --git a/client/hostvolumemanager/host_volume_plugin.go b/client/hostvolumemanager/host_volume_plugin.go index b48d7c2ff..c51336b41 100644 --- a/client/hostvolumemanager/host_volume_plugin.go +++ b/client/hostvolumemanager/host_volume_plugin.go @@ -23,14 +23,18 @@ import ( const ( // environment variables for external plugins + EnvOperation = "DHV_OPERATION" - EnvHostPath = "DHV_HOST_PATH" - EnvNodeID = "DHV_NODE_ID" + EnvVolumesDir = "DHV_VOLUMES_DIR" + EnvPluginDir = "DHV_PLUGIN_DIR" + EnvCreatedPath = "DHV_CREATED_PATH" + EnvNamespace = "DHV_NAMESPACE" EnvVolumeName = "DHV_VOLUME_NAME" EnvVolumeID = "DHV_VOLUME_ID" + EnvNodeID = "DHV_NODE_ID" + EnvNodePool = "DHV_NODE_POOL" EnvCapacityMin = "DHV_CAPACITY_MIN_BYTES" EnvCapacityMax = "DHV_CAPACITY_MAX_BYTES" - EnvPluginDir = "DHV_PLUGIN_DIR" EnvParameters = "DHV_PARAMETERS" ) @@ -62,10 +66,10 @@ const HostVolumePluginMkdirVersion = "0.0.1" var _ HostVolumePlugin = &HostVolumePluginMkdir{} // HostVolumePluginMkdir is a plugin that creates a directory within the -// specified TargetPath. It is built-in to Nomad, so is always available. +// specified VolumesDir. It is built-in to Nomad, so is always available. type HostVolumePluginMkdir struct { ID string - TargetPath string + VolumesDir string log hclog.Logger } @@ -80,7 +84,7 @@ func (p *HostVolumePluginMkdir) Fingerprint(_ context.Context) (*PluginFingerpri func (p *HostVolumePluginMkdir) Create(_ context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) { - path := filepath.Join(p.TargetPath, req.ID) + path := filepath.Join(p.VolumesDir, req.ID) log := p.log.With( "operation", "create", "volume_id", req.ID, @@ -102,7 +106,7 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context, return nil, err } - err := os.Mkdir(path, 0o700) + err := os.MkdirAll(path, 0o700) if err != nil { log.Debug("error with plugin", "error", err) return nil, err @@ -113,7 +117,7 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context, } func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error { - path := filepath.Join(p.TargetPath, req.ID) + path := filepath.Join(p.VolumesDir, req.ID) log := p.log.With( "operation", "delete", "volume_id", req.ID, @@ -135,7 +139,7 @@ var _ HostVolumePlugin = &HostVolumePluginExternal{} // NewHostVolumePluginExternal returns an external host volume plugin // if the specified executable exists on disk. func NewHostVolumePluginExternal(log hclog.Logger, - pluginDir, filename, targetPath string) (*HostVolumePluginExternal, error) { + pluginDir, filename, volumesDir, nodePool string) (*HostVolumePluginExternal, error) { // this should only be called with already-detected executables, // but we'll double-check it anyway, so we can provide a tidy error message // if it has changed between fingerprinting and execution. @@ -153,8 +157,9 @@ func NewHostVolumePluginExternal(log hclog.Logger, return &HostVolumePluginExternal{ ID: filename, Executable: executable, - TargetPath: targetPath, + VolumesDir: volumesDir, PluginDir: pluginDir, + NodePool: nodePool, log: log, }, nil } @@ -166,16 +171,17 @@ func NewHostVolumePluginExternal(log hclog.Logger, type HostVolumePluginExternal struct { ID string Executable string - TargetPath string + VolumesDir string PluginDir string + NodePool string log hclog.Logger } // Fingerprint calls the executable with the following parameters: -// arguments: fingerprint +// arguments: $1=fingerprint // environment: -// DHV_OPERATION=fingerprint +// - DHV_OPERATION=fingerprint // // Response should be valid JSON on stdout, with a "version" key, e.g.: // {"version": "0.0.1"} @@ -205,21 +211,23 @@ func (p *HostVolumePluginExternal) Fingerprint(ctx context.Context) (*PluginFing } // Create calls the executable with the following parameters: -// arguments: create {path to create} +// arguments: $1=create // environment: -// DHV_OPERATION=create -// DHV_HOST_PATH={path to create} -// DHV_NODE_ID={Nomad node ID} -// DHV_VOLUME_NAME={name from the volume specification} -// DHV_VOLUME_ID={Nomad volume ID} -// DHV_CAPACITY_MIN_BYTES={capacity_min from the volume spec} -// DHV_CAPACITY_MAX_BYTES={capacity_max from the volume spec} -// DHV_PARAMETERS={json of parameters from the volume spec} -// DHV_PLUGIN_DIR={path to directory containing plugins} +// - DHV_OPERATION=create +// - DHV_VOLUMES_DIR={directory to put the volume in} +// - DHV_PLUGIN_DIR={path to directory containing plugins} +// - DHV_NAMESPACE={volume namespace} +// - DHV_VOLUME_NAME={name from the volume specification} +// - DHV_VOLUME_ID={volume ID generated by Nomad} +// - DHV_NODE_ID={Nomad node ID} +// - DHV_NODE_POOL={Nomad node pool} +// - DHV_CAPACITY_MIN_BYTES={capacity_min from the volume spec, expressed in bytes} +// - DHV_CAPACITY_MAX_BYTES={capacity_max from the volume spec, expressed in bytes} +// - DHV_PARAMETERS={stringified json of parameters from the volume spec} // // Response should be valid JSON on stdout with "path" and "bytes", e.g.: -// {"path": $HOST_PATH, "bytes": 50000000} -// "path" must be provided to confirm that the requested path is what was +// {"path": "/path/that/was/created", "bytes": 50000000} +// "path" must be provided to confirm the requested path is what was // created by the plugin. "bytes" is the actual size of the volume created // by the plugin; if excluded, it will default to 0. // @@ -233,14 +241,22 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context, return nil, fmt.Errorf("error marshaling volume pramaters: %w", err) } envVars := []string{ - fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID), + fmt.Sprintf("%s=%s", EnvOperation, "create"), + fmt.Sprintf("%s=%s", EnvVolumesDir, p.VolumesDir), + fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir), + fmt.Sprintf("%s=%s", EnvNodePool, p.NodePool), + // values from volume spec + fmt.Sprintf("%s=%s", EnvNamespace, req.Namespace), fmt.Sprintf("%s=%s", EnvVolumeName, req.Name), + fmt.Sprintf("%s=%s", EnvVolumeID, req.ID), fmt.Sprintf("%s=%d", EnvCapacityMin, req.RequestedCapacityMinBytes), fmt.Sprintf("%s=%d", EnvCapacityMax, req.RequestedCapacityMaxBytes), + fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID), fmt.Sprintf("%s=%s", EnvParameters, params), } - stdout, _, err := p.runPlugin(ctx, "create", req.ID, envVars) + log := p.log.With("volume_name", req.Name, "volume_id", req.ID) + stdout, _, err := p.runPlugin(ctx, log, "create", envVars) if err != nil { return nil, fmt.Errorf("error creating volume %q with plugin %q: %w", req.ID, p.ID, err) } @@ -253,20 +269,22 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context, // an error here after the plugin has done who-knows-what. return nil, err } - // TODO: validate returned host path return &pluginResp, nil } // Delete calls the executable with the following parameters: -// arguments: delete {path to create} +// arguments: $1=delete // environment: -// DHV_OPERATION=delete -// DHV_HOST_PATH={path to create} -// DHV_NODE_ID={Nomad node ID} -// DHV_VOLUME_NAME={name from the volume specification} -// DHV_VOLUME_ID={Nomad volume ID} -// DHV_PARAMETERS={json of parameters from the volume spec} -// DHV_PLUGIN_DIR={path to directory containing plugins} +// - DHV_OPERATION=delete +// - DHV_CREATED_PATH={path that `create` returned} +// - DHV_VOLUMES_DIR={directory that volumes should be put in} +// - DHV_PLUGIN_DIR={path to directory containing plugins} +// - DHV_NAMESPACE={volume namespace} +// - DHV_VOLUME_NAME={name from the volume specification} +// - DHV_VOLUME_ID={volume ID generated by Nomad} +// - DHV_NODE_ID={Nomad node ID} +// - DHV_NODE_POOL={Nomad node pool} +// - DHV_PARAMETERS={stringified json of parameters from the volume spec} // // Response on stdout is discarded. // @@ -280,42 +298,38 @@ func (p *HostVolumePluginExternal) Delete(ctx context.Context, return fmt.Errorf("error marshaling volume pramaters: %w", err) } envVars := []string{ - fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID), + fmt.Sprintf("%s=%s", EnvOperation, "delete"), + fmt.Sprintf("%s=%s", EnvVolumesDir, p.VolumesDir), + fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir), + fmt.Sprintf("%s=%s", EnvNodePool, p.NodePool), + // from create response + fmt.Sprintf("%s=%s", EnvCreatedPath, req.HostPath), + // values from volume spec + fmt.Sprintf("%s=%s", EnvNamespace, req.Namespace), fmt.Sprintf("%s=%s", EnvVolumeName, req.Name), + fmt.Sprintf("%s=%s", EnvVolumeID, req.ID), + fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID), fmt.Sprintf("%s=%s", EnvParameters, params), } - _, _, err = p.runPlugin(ctx, "delete", req.ID, envVars) + log := p.log.With("volume_name", req.Name, "volume_id", req.ID) + _, _, err = p.runPlugin(ctx, log, "delete", envVars) if err != nil { return fmt.Errorf("error deleting volume %q with plugin %q: %w", req.ID, p.ID, err) } return nil } -// runPlugin executes the... executable with these additional env vars: -// DHV_OPERATION={op} -// DHV_HOST_PATH={path to create} -// DHV_VOLUME_ID={Nomad volume ID} -// DHV_PLUGIN_DIR={path to directory containing plugins} -func (p *HostVolumePluginExternal) runPlugin(ctx context.Context, - op, volID string, env []string) (stdout, stderr []byte, err error) { +// runPlugin executes the... executable +func (p *HostVolumePluginExternal) runPlugin(ctx context.Context, log hclog.Logger, + op string, env []string) (stdout, stderr []byte, err error) { - path := filepath.Join(p.TargetPath, volID) - log := p.log.With( - "operation", op, - "volume_id", volID, - "path", path) + log = log.With("operation", op) log.Debug("running plugin") // set up plugin execution - cmd := exec.CommandContext(ctx, p.Executable, op, path) - - cmd.Env = append([]string{ - fmt.Sprintf("%s=%s", EnvOperation, op), - fmt.Sprintf("%s=%s", EnvHostPath, path), - fmt.Sprintf("%s=%s", EnvVolumeID, volID), - fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir), - }, env...) + cmd := exec.CommandContext(ctx, p.Executable, op) + cmd.Env = env stdout, stderr, err = runCommand(cmd) diff --git a/client/hostvolumemanager/host_volume_plugin_test.go b/client/hostvolumemanager/host_volume_plugin_test.go index 07709b564..826f7b943 100644 --- a/client/hostvolumemanager/host_volume_plugin_test.go +++ b/client/hostvolumemanager/host_volume_plugin_test.go @@ -21,7 +21,7 @@ func TestHostVolumePluginMkdir(t *testing.T) { plug := &HostVolumePluginMkdir{ ID: "test-mkdir-plugin", - TargetPath: tmp, + VolumesDir: tmp, log: testlog.HCLogger(t), } @@ -58,7 +58,7 @@ func TestHostVolumePluginMkdir(t *testing.T) { t.Run("sad", func(t *testing.T) { // can't mkdir inside a file - plug.TargetPath = "host_volume_plugin_test.go" + plug.VolumesDir = "host_volume_plugin_test.go" resp, err := plug.Create(timeout(t), &cstructs.ClientHostVolumeCreateRequest{ @@ -79,23 +79,25 @@ func TestNewHostVolumePluginExternal(t *testing.T) { log := testlog.HCLogger(t) var err error - _, err = NewHostVolumePluginExternal(log, ".", "non-existent", "target") + _, err = NewHostVolumePluginExternal(log, ".", "non-existent", "target", "") must.ErrorIs(t, err, ErrPluginNotExists) - _, err = NewHostVolumePluginExternal(log, ".", "host_volume_plugin_test.go", "target") + _, err = NewHostVolumePluginExternal(log, ".", "host_volume_plugin_test.go", "target", "") must.ErrorIs(t, err, ErrPluginNotExecutable) t.Run("unix", func(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("skipped because windows") // db TODO(1.10.0) } - p, err := NewHostVolumePluginExternal(log, "./test_fixtures", "test_plugin.sh", "test-target") + p, err := NewHostVolumePluginExternal(log, + "./test_fixtures", "test_plugin.sh", "test-target", "test-pool") must.NoError(t, err) must.Eq(t, &HostVolumePluginExternal{ ID: "test_plugin.sh", Executable: "test_fixtures/test_plugin.sh", - TargetPath: "test-target", + VolumesDir: "test-target", PluginDir: "./test_fixtures", + NodePool: "test-pool", log: log, }, p) }) @@ -116,7 +118,8 @@ func TestHostVolumePluginExternal(t *testing.T) { t.Run("happy", func(t *testing.T) { log, getLogs := logRecorder(t) - plug, err := NewHostVolumePluginExternal(log, "./test_fixtures", "test_plugin.sh", tmp) + plug, err := NewHostVolumePluginExternal(log, + "./test_fixtures", "test_plugin.sh", tmp, "test-node-pool") must.NoError(t, err) // fingerprint @@ -130,6 +133,7 @@ func TestHostVolumePluginExternal(t *testing.T) { &cstructs.ClientHostVolumeCreateRequest{ Name: "test-vol-name", ID: volID, + Namespace: "test-namespace", NodeID: "test-node", RequestedCapacityMinBytes: 5, RequestedCapacityMaxBytes: 10, @@ -151,6 +155,8 @@ func TestHostVolumePluginExternal(t *testing.T) { &cstructs.ClientHostVolumeDeleteRequest{ Name: "test-vol-name", ID: volID, + HostPath: resp.Path, + Namespace: "test-namespace", NodeID: "test-node", Parameters: map[string]string{"key": "val"}, }) @@ -164,7 +170,7 @@ func TestHostVolumePluginExternal(t *testing.T) { t.Run("sad", func(t *testing.T) { log, getLogs := logRecorder(t) - plug, err := NewHostVolumePluginExternal(log, "./test_fixtures", "test_plugin_sad.sh", tmp) + plug, err := NewHostVolumePluginExternal(log, "./test_fixtures", "test_plugin_sad.sh", tmp, "") must.NoError(t, err) v, err := plug.Fingerprint(timeout(t)) @@ -180,11 +186,7 @@ func TestHostVolumePluginExternal(t *testing.T) { resp, err := plug.Create(timeout(t), &cstructs.ClientHostVolumeCreateRequest{ - ID: volID, - NodeID: "test-node", - RequestedCapacityMinBytes: 5, - RequestedCapacityMaxBytes: 10, - Parameters: map[string]string{"key": "val"}, + ID: volID, }) must.EqError(t, err, `error creating volume "test-vol-id" with plugin "test_plugin_sad.sh": exit status 1`) must.Nil(t, resp) @@ -197,9 +199,7 @@ func TestHostVolumePluginExternal(t *testing.T) { err = plug.Delete(timeout(t), &cstructs.ClientHostVolumeDeleteRequest{ - ID: volID, - NodeID: "test-node", - Parameters: map[string]string{"key": "val"}, + ID: volID, }) must.EqError(t, err, `error deleting volume "test-vol-id" with plugin "test_plugin_sad.sh": exit status 1`) logged = getLogs() diff --git a/client/hostvolumemanager/host_volumes.go b/client/hostvolumemanager/host_volumes.go index dcbbd2b02..7b4ce33d9 100644 --- a/client/hostvolumemanager/host_volumes.go +++ b/client/hostvolumemanager/host_volumes.go @@ -35,9 +35,12 @@ type Config struct { // PluginDir is where external plugins may be found. PluginDir string - // SharedMountDir is where plugins should place the directory - // that will later become a volume HostPath - SharedMountDir string + // VolumesDir is where plugins should place the directory + // that will later become a volume's HostPath + VolumesDir string + + // NodePool is passed into external plugin execution environment. + NodePool string // StateMgr manages client state to restore on agent restarts. StateMgr HostVolumeStateManager @@ -51,7 +54,8 @@ type Config struct { // and registers volumes with the client node. type HostVolumeManager struct { pluginDir string - sharedMountDir string + volumesDir string + nodePool string stateMgr HostVolumeStateManager updateNodeVols HostVolumeNodeUpdater builtIns map[string]HostVolumePlugin @@ -64,13 +68,14 @@ func NewHostVolumeManager(logger hclog.Logger, config Config) *HostVolumeManager logger = logger.Named("host_volume_manager") return &HostVolumeManager{ pluginDir: config.PluginDir, - sharedMountDir: config.SharedMountDir, + volumesDir: config.VolumesDir, + nodePool: config.NodePool, stateMgr: config.StateMgr, updateNodeVols: config.UpdateNodeVols, builtIns: map[string]HostVolumePlugin{ HostVolumePluginMkdirID: &HostVolumePluginMkdir{ ID: HostVolumePluginMkdirID, - TargetPath: config.SharedMountDir, + VolumesDir: config.VolumesDir, log: logger.With("plugin_id", HostVolumePluginMkdirID), }, }, @@ -84,13 +89,16 @@ func NewHostVolumeManager(logger hclog.Logger, config Config) *HostVolumeManager func (hvm *HostVolumeManager) Create(ctx context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*cstructs.ClientHostVolumeCreateResponse, error) { + log := hvm.log.With("volume_name", req.Name, "volume_id", req.ID) + plug, err := hvm.getPlugin(req.PluginID) if err != nil { return nil, err } // can't have two of the same volume name w/ different IDs per client node - if err := hvm.locker.lock(req.Name, req.ID); err != nil { + isNewVolume, err := hvm.locker.lock(req.Name, req.ID) + if err != nil { return nil, err } @@ -106,22 +114,26 @@ func (hvm *HostVolumeManager) Create(ctx context.Context, CreateReq: req, } if err := hvm.stateMgr.PutDynamicHostVolume(volState); err != nil { - // if we fail to write to state, delete the volume so it isn't left - // lying around without Nomad knowing about it. - hvm.log.Error("failed to save volume in state, so deleting", "volume_id", req.ID, "error", err) - delErr := plug.Delete(ctx, &cstructs.ClientHostVolumeDeleteRequest{ - ID: req.ID, - PluginID: req.PluginID, - NodeID: req.NodeID, - HostPath: hvm.sharedMountDir, - Parameters: req.Parameters, - }) - if delErr != nil { - hvm.log.Warn("error deleting volume after state store failure", "volume_id", req.ID, "error", delErr) - err = multierror.Append(err, delErr) + // if we fail to write to state on initial create, + // delete the volume so it isn't left lying around + // without Nomad knowing about it. + log.Error("failed to save volume in client state", "error", err) + if isNewVolume { + log.Error("initial create detected, running delete") + delErr := plug.Delete(ctx, &cstructs.ClientHostVolumeDeleteRequest{ + ID: req.ID, + PluginID: req.PluginID, + NodeID: req.NodeID, + HostPath: hvm.volumesDir, + Parameters: req.Parameters, + }) + if delErr != nil { + log.Warn("error deleting volume after state store failure", "error", delErr) + err = multierror.Append(err, delErr) + } + // free up the volume name whether delete succeeded or not. + hvm.locker.release(req.Name) } - // free up the volume name whether delete succeeded or not. - hvm.locker.release(req.Name) return nil, err } @@ -142,7 +154,7 @@ func (hvm *HostVolumeManager) Register(ctx context.Context, req *cstructs.ClientHostVolumeRegisterRequest) error { // can't have two of the same volume name w/ different IDs per client node - if err := hvm.locker.lock(req.Name, req.ID); err != nil { + if _, err := hvm.locker.lock(req.Name, req.ID); err != nil { return err } @@ -216,7 +228,7 @@ func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) { return plug, nil } log := hvm.log.With("plugin_id", id) - return NewHostVolumePluginExternal(log, hvm.pluginDir, id, hvm.sharedMountDir) + return NewHostVolumePluginExternal(log, hvm.pluginDir, id, hvm.volumesDir, hvm.nodePool) } // restoreFromState loads all volumes from client state and runs Create for @@ -270,7 +282,7 @@ func (hvm *HostVolumeManager) restoreForCreate(ctx context.Context, vol *cstruct } // lock the name so future creates can't produce duplicates. - err = hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID) + _, err = hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID) // state should never have duplicate vol names, and restore happens // prior to node registration, so new creates shouldn't come in // concurrently, but check for error just in case. @@ -299,7 +311,7 @@ func (hvm *HostVolumeManager) restoreForCreate(ctx context.Context, vol *cstruct // Register, by converting the stored struct. It otherwise behaves the same as // restoreForCreate. func (hvm *HostVolumeManager) restoreForRegister(vol *cstructs.HostVolumeState) (*structs.ClientHostVolumeConfig, error) { - err := hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID) + _, err := hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID) if err != nil { hvm.log.Error("error during restore", "volume_name", vol.CreateReq.Name, @@ -340,20 +352,20 @@ type volLocker struct { locks sync.Map } -// lock the provided name, error if it was already locked with a different ID -func (l *volLocker) lock(name, id string) error { +// lock the provided name, return true if it was not already locked, +// and error if it was already locked with a different ID. +func (l *volLocker) lock(name, id string) (bool, error) { current, exists := l.locks.LoadOrStore(name, id) if exists && id != current.(string) { - return ErrVolumeNameExists + return false, ErrVolumeNameExists } - return nil + return !exists, nil } func (l *volLocker) release(name string) { l.locks.Delete(name) } -// only used in tests to assert lock state func (l *volLocker) isLocked(name string) bool { _, locked := l.locks.Load(name) return locked diff --git a/client/hostvolumemanager/host_volumes_test.go b/client/hostvolumemanager/host_volumes_test.go index 9d553fd6f..35a3e95b3 100644 --- a/client/hostvolumemanager/host_volumes_test.go +++ b/client/hostvolumemanager/host_volumes_test.go @@ -29,17 +29,17 @@ func TestHostVolumeManager(t *testing.T) { memDB := cstate.NewMemDB(log) node := newFakeNode(t) - hostPathCreate := t.TempDir() - hostPathRegister := t.TempDir() + volumesDirCreate := t.TempDir() + volumesDirRegister := t.TempDir() hvm := NewHostVolumeManager(log, Config{ PluginDir: "./test_fixtures", - SharedMountDir: hostPathCreate, + VolumesDir: volumesDirCreate, StateMgr: errDB, UpdateNodeVols: node.updateVol, }) - plug := &fakePlugin{mountDir: hostPathCreate} + plug := &fakePlugin{volsDir: volumesDirCreate} hvm.builtIns["test-plugin"] = plug ctx := timeout(t) @@ -87,7 +87,7 @@ func TestHostVolumeManager(t *testing.T) { expectResp := &cstructs.ClientHostVolumeCreateResponse{ VolumeName: "created-volume", VolumeID: "vol-id-1", - HostPath: hostPathCreate, + HostPath: filepath.Join(volumesDirCreate, "vol-id-1"), CapacityBytes: 5, } must.Eq(t, expectResp, resp) @@ -108,6 +108,15 @@ func TestHostVolumeManager(t *testing.T) { must.NoError(t, err) must.Eq(t, expectResp, resp) + // error saving state on restore/update should not run delete + hvm.stateMgr = errDB + resp, err = hvm.Create(ctx, req) + must.ErrorIs(t, err, cstate.ErrDBError) + must.Nil(t, resp) + must.Eq(t, "", plug.deleted) + plug.reset() + hvm.stateMgr = memDB + // duplicate create with the same vol name but different ID should fail _, err = hvm.Create(ctx, &cstructs.ClientHostVolumeCreateRequest{ Name: name, @@ -122,7 +131,7 @@ func TestHostVolumeManager(t *testing.T) { req := &cstructs.ClientHostVolumeRegisterRequest{ ID: "vol-id-2", Name: name, - HostPath: hostPathRegister, + HostPath: volumesDirRegister, CapacityBytes: 1000, } err := hvm.Register(ctx, req) @@ -182,7 +191,7 @@ func TestHostVolumeManager(t *testing.T) { must.Eq(t, VolumeMap{ "registered-volume": &structs.ClientHostVolumeConfig{ Name: "registered-volume", - Path: hostPathRegister, + Path: volumesDirRegister, ID: "vol-id-2", }, }, node.vols, must.Sprint("created-volume should be deleted from node")) @@ -208,7 +217,7 @@ func TestHostVolumeManager(t *testing.T) { } type fakePlugin struct { - mountDir string + volsDir string created string deleted string fingerprintErr error @@ -236,7 +245,7 @@ func (p *fakePlugin) Create(_ context.Context, req *cstructs.ClientHostVolumeCre } p.created = req.ID return &HostVolumePluginCreateResponse{ - Path: p.mountDir, + Path: filepath.Join(p.volsDir, req.ID), SizeBytes: req.RequestedCapacityMinBytes, }, nil } @@ -302,14 +311,14 @@ func TestHostVolumeManager_restoreFromState(t *testing.T) { // new volume manager should load it from state and run Create, // resulting in a volume directory in this mountDir. - mountDir := t.TempDir() - volPath := filepath.Join(mountDir, vol1.ID) + volsDir := t.TempDir() + volPath := filepath.Join(volsDir, vol1.ID) hvm := NewHostVolumeManager(log, Config{ StateMgr: state, UpdateNodeVols: node.updateVol, PluginDir: "/wherever", - SharedMountDir: mountDir, + VolumesDir: volsDir, }) vols, err := hvm.restoreFromState(timeout(t)) diff --git a/client/hostvolumemanager/test_fixtures/test_plugin.sh b/client/hostvolumemanager/test_fixtures/test_plugin.sh index 6e15aec21..115cfede8 100755 --- a/client/hostvolumemanager/test_fixtures/test_plugin.sh +++ b/client/hostvolumemanager/test_fixtures/test_plugin.sh @@ -11,41 +11,39 @@ test "$1" == "$DHV_OPERATION" echo 'all operations should ignore stderr' 1>&2 -# since we will be running `rm -rf` (frightening), -# check to make sure DHV_HOST_PATH has a uuid shape in it. -# Nomad generates a volume ID and includes it in the path. -validate_path() { - if [[ ! "$DHV_HOST_PATH" =~ "test-vol-id" ]]; then - 1>&2 echo "expected test vol ID in the DHV_HOST_PATH; got: '$DHV_HOST_PATH'" - exit 1 - fi -} - case $1 in fingerprint) echo '{"version": "0.0.2"}' ;; create) - test "$2" == "$DHV_HOST_PATH" - test "$DHV_NODE_ID" == 'test-node' test "$DHV_VOLUME_NAME" == 'test-vol-name' test "$DHV_VOLUME_ID" == 'test-vol-id' - test "$DHV_PARAMETERS" == '{"key":"val"}' + test "$DHV_NAMESPACE" == 'test-namespace' test "$DHV_CAPACITY_MIN_BYTES" -eq 5 test "$DHV_CAPACITY_MAX_BYTES" -eq 10 + test "$DHV_NODE_ID" == 'test-node' + test "$DHV_NODE_POOL" == 'test-node-pool' + test "$DHV_PARAMETERS" == '{"key":"val"}' test "$DHV_PLUGIN_DIR" == './test_fixtures' - validate_path "$DHV_HOST_PATH" - mkdir "$2" - printf '{"path": "%s", "bytes": 5, "context": %s}' "$2" "$DHV_PARAMETERS" + test -d "$DHV_VOLUMES_DIR" + target="$DHV_VOLUMES_DIR/$DHV_VOLUME_ID" + test "$target" != '/' + mkdir -p "$target" + printf '{"path": "%s", "bytes": 5}' "$target" ;; delete) - test "$2" == "$DHV_HOST_PATH" test "$DHV_NODE_ID" == 'test-node' + test "$DHV_NODE_POOL" == 'test-node-pool' + test "$DHV_NAMESPACE" == 'test-namespace' test "$DHV_VOLUME_NAME" == 'test-vol-name' test "$DHV_VOLUME_ID" == 'test-vol-id' test "$DHV_PARAMETERS" == '{"key":"val"}' test "$DHV_PLUGIN_DIR" == './test_fixtures' - validate_path "$DHV_HOST_PATH" - rm -rfv "$2" ;; + test -d "$DHV_VOLUMES_DIR" + target="$DHV_VOLUMES_DIR/$DHV_VOLUME_ID" + test "$target" != '/' + test "$DHV_CREATED_PATH" == "$target" + rm -rfv "$target" + ;; *) echo "unknown operation $1" exit 1 ;; diff --git a/client/hostvolumemanager/volume_fingerprint_test.go b/client/hostvolumemanager/volume_fingerprint_test.go index ce9ea4295..06c8c80f1 100644 --- a/client/hostvolumemanager/volume_fingerprint_test.go +++ b/client/hostvolumemanager/volume_fingerprint_test.go @@ -4,6 +4,7 @@ package hostvolumemanager import ( + "path/filepath" "testing" "github.com/hashicorp/nomad/client/state" @@ -106,11 +107,11 @@ func TestWaitForFirstFingerprint(t *testing.T) { node := newFakeNode(t) hvm := NewHostVolumeManager(log, Config{ PluginDir: "", - SharedMountDir: tmp, + VolumesDir: tmp, StateMgr: memDB, UpdateNodeVols: node.updateVol, }) - plug := &fakePlugin{mountDir: tmp} + plug := &fakePlugin{volsDir: tmp} hvm.builtIns = map[string]HostVolumePlugin{ "test-plugin": plug, } @@ -136,7 +137,7 @@ func TestWaitForFirstFingerprint(t *testing.T) { "vol-name": &structs.ClientHostVolumeConfig{ Name: "vol-name", ID: "vol-id", - Path: tmp, + Path: filepath.Join(tmp, "vol-id"), ReadOnly: false, }, }, node.vols) diff --git a/client/structs/host_volumes.go b/client/structs/host_volumes.go index 2e5121dde..3e97e9c99 100644 --- a/client/structs/host_volumes.go +++ b/client/structs/host_volumes.go @@ -22,6 +22,10 @@ type ClientHostVolumeCreateRequest struct { // built-in plugin. PluginID string + // Namespace is the Nomad namespace for the volume. + // It's in the client RPC to be included in plugin execution environment. + Namespace string + // NodeID is the node where the volume is placed. It's included in the // client RPC request so that the server can route the request to the // correct node. @@ -88,13 +92,17 @@ type ClientHostVolumeDeleteRequest struct { // built-in plugin. PluginID string + // Namespace is the Nomad namespace for the volume. + // It's in the client RPC to be included in plugin execution environment. + Namespace string + // NodeID is the node where the volume is placed. It's included in the // client RPC request so that the server can route the request to the // correct node. NodeID string - // Path is the host path where the volume's mount point was created. We send - // this from the server to allow verification by plugins + // HostPath is the host path where the volume's mount point was created. + // We send this from the server to allow verification by plugins. HostPath string // Parameters are an opaque map of parameters for the host volume plugin. diff --git a/command/agent/agent.go b/command/agent/agent.go index 40d467e23..b83ff2229 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -724,6 +724,7 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) { if agentConfig.DataDir != "" { conf.StateDir = filepath.Join(agentConfig.DataDir, "client") conf.AllocDir = filepath.Join(agentConfig.DataDir, "alloc") + conf.HostVolumesDir = filepath.Join(agentConfig.DataDir, "host_volumes") conf.HostVolumePluginDir = filepath.Join(agentConfig.DataDir, "host_volume_plugins") dataParent := filepath.Dir(agentConfig.DataDir) conf.AllocMountsDir = filepath.Join(dataParent, "alloc_mounts") @@ -740,6 +741,9 @@ func convertClientConfig(agentConfig *Config) (*clientconfig.Config, error) { if agentConfig.Client.HostVolumePluginDir != "" { conf.HostVolumePluginDir = agentConfig.Client.HostVolumePluginDir } + if agentConfig.Client.HostVolumesDir != "" { + conf.HostVolumesDir = agentConfig.Client.HostVolumesDir + } if agentConfig.Client.NetworkInterface != "" { conf.NetworkInterface = agentConfig.Client.NetworkInterface } diff --git a/command/agent/command.go b/command/agent/command.go index 4284d1e22..5ef74c4eb 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -108,6 +108,7 @@ func (c *Command) readConfig() *Config { flags.StringVar(&cmdConfig.Client.StateDir, "state-dir", "", "") flags.StringVar(&cmdConfig.Client.AllocDir, "alloc-dir", "", "") flags.StringVar(&cmdConfig.Client.AllocMountsDir, "alloc-mounts-dir", "", "") + flags.StringVar(&cmdConfig.Client.HostVolumesDir, "host-volumes-dir", "", "") flags.StringVar(&cmdConfig.Client.HostVolumePluginDir, "host-volume-plugin-dir", "", "") flags.StringVar(&cmdConfig.Client.NodeClass, "node-class", "", "") flags.StringVar(&cmdConfig.Client.NodePool, "node-pool", "", "") @@ -386,6 +387,7 @@ func (c *Command) IsValidConfig(config, cmdConfig *Config) bool { "plugin-dir": config.PluginDir, "alloc-dir": config.Client.AllocDir, "alloc-mounts-dir": config.Client.AllocMountsDir, + "host-volumes-dir": config.Client.HostVolumesDir, "host-volume-plugin-dir": config.Client.HostVolumePluginDir, "state-dir": config.Client.StateDir, } @@ -1563,6 +1565,10 @@ Client Options: The default speed for network interfaces in MBits if the link speed can not be determined dynamically. + -host-volumes-dir + Directory wherein host volume plugins should place volumes. The default is + /host_volumes. + -host-volume-plugin-dir Directory containing dynamic host volume plugins. The default is /host_volume_plugins. diff --git a/command/agent/config.go b/command/agent/config.go index 28a6afb67..0e0f82f46 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -229,6 +229,10 @@ type ClientConfig struct { // AllocMountsDir is the directory for storing mounts into allocation data AllocMountsDir string `hcl:"alloc_mounts_dir"` + // HostVolumesDir is the suggested directory for plugins to put volumes. + // Volume plugins may ignore this suggestion, but we provide this default. + HostVolumesDir string `hcl:"host_volumes_dir"` + // HostVolumePluginDir directory contains dynamic host volume plugins HostVolumePluginDir string `hcl:"host_volume_plugin_dir"` @@ -2319,6 +2323,9 @@ func (a *ClientConfig) Merge(b *ClientConfig) *ClientConfig { if b.AllocMountsDir != "" { result.AllocMountsDir = b.AllocMountsDir } + if b.HostVolumesDir != "" { + result.HostVolumesDir = b.HostVolumesDir + } if b.HostVolumePluginDir != "" { result.HostVolumePluginDir = b.HostVolumePluginDir } diff --git a/demo/hostvolume/_test-plugin.sh b/demo/hostvolume/_test-plugin.sh index 830af0fb0..f465d75c2 100755 --- a/demo/hostvolume/_test-plugin.sh +++ b/demo/hostvolume/_test-plugin.sh @@ -40,7 +40,7 @@ fi plugin="$1" op="$2" -alloc_mounts="${3:-/tmp}" +volumes_dir="${3:-/tmp}" uuid="${4:-74564d17-ce50-0bc1-48e5-6feaa41ede48}" node_id='0b62d807-6101-a80f-374d-e1c430abbf47' plugin_dir="$(dirname "$plugin")" @@ -52,9 +52,9 @@ case $op in ;; create) - args="create $alloc_mounts/$uuid" + args='create' export DHV_OPERATION='create' - export DHV_HOST_PATH="$alloc_mounts/$uuid" + export DHV_VOLUMES_DIR="$volumes_dir" export DHV_VOLUME_NAME=test export DHV_VOLUME_ID="$uuid" export DHV_NODE_ID="$node_id" @@ -65,9 +65,9 @@ case $op in ;; delete) - args="delete $alloc_mounts/$uuid" + args='delete' export DHV_OPERATION='delete' - export DHV_HOST_PATH="$alloc_mounts/$uuid" + export DHV_VOLUMES_DIR="$volumes_dir" export DHV_NODE_ID="$node_id" export DHV_VOLUME_NAME=test export DHV_VOLUME_ID="$uuid" diff --git a/demo/hostvolume/example-plugin-mkfs b/demo/hostvolume/example-plugin-mkfs index 69aa84db9..33a3e5742 100755 --- a/demo/hostvolume/example-plugin-mkfs +++ b/demo/hostvolume/example-plugin-mkfs @@ -45,10 +45,6 @@ for arg in "$@"; do esac done -# path is required for everything else -[ $# -lt 2 ] && { echo 'path required; seek --help' 1>&2; exit 1; } -host_path="$2" - # OS detect if [[ "$OSTYPE" == "linux-"* ]]; then ext=ext4 @@ -83,7 +79,7 @@ fi validate_path() { local path="$1" if [[ ! "$path" =~ [0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12} ]]; then - 1>&2 echo "expected uuid-lookin ID in the HOST_PATH; got: '$path'" + 1>&2 echo "expected uuid-lookin ID in target host path; got: '$path'" return 1 fi } @@ -100,6 +96,7 @@ create_volume() { # translate to mb for dd block size local megs=$((bytes / 1024 / 1024)) # lazy, approximate + mkdir -p "$(dirname "$path")" # the extra conditionals are for idempotency if [ ! -f "$path.$ext" ]; then mkfsExec "$path" $megs @@ -118,6 +115,7 @@ delete_volume() { rm -f "$path"."$ext" } +host_path="$DHV_VOLUMES_DIR/$DHV_VOLUME_ID" case "$1" in "create") create_volume "$host_path" "$DHV_CAPACITY_MIN_BYTES" diff --git a/nomad/host_volume_endpoint.go b/nomad/host_volume_endpoint.go index f8ae36b24..aa8e7ced7 100644 --- a/nomad/host_volume_endpoint.go +++ b/nomad/host_volume_endpoint.go @@ -463,6 +463,7 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error { ID: vol.ID, Name: vol.Name, PluginID: vol.PluginID, + Namespace: vol.Namespace, NodeID: vol.NodeID, RequestedCapacityMinBytes: vol.RequestedCapacityMinBytes, RequestedCapacityMaxBytes: vol.RequestedCapacityMaxBytes, @@ -696,6 +697,7 @@ func (v *HostVolume) deleteVolume(vol *structs.HostVolume) error { ID: vol.ID, Name: vol.Name, PluginID: vol.PluginID, + Namespace: vol.Namespace, NodeID: vol.NodeID, HostPath: vol.HostPath, Parameters: vol.Parameters, diff --git a/website/content/docs/configuration/client.mdx b/website/content/docs/configuration/client.mdx index 1201001f5..de740eb31 100644 --- a/website/content/docs/configuration/client.mdx +++ b/website/content/docs/configuration/client.mdx @@ -206,6 +206,18 @@ client { - `host_volume` ([host_volume](#host_volume-block): nil) - Exposes paths from the host as volumes that can be mounted into jobs. +- `host_volumes_dir` `(string: "")` - Specifies the directory wherein + host volume plugins should place volumes. When this parameter is empty, Nomad + generates the path using the [top-level `data_dir`][top_level_data_dir] + suffixed with `host_volumes`, like `"/opt/nomad/host_volumes"`. + This must be an absolute path. + +- `host_volume_plugin_dir` `(string: "")` - Specifies the directory to find host + volume plugins. When this parameter is empty, Nomad generates the path + using the [top-level `data_dir`][top_level_data_dir] suffixed with + `host_volume_plugins`, like `"/opt/nomad/host_volume_plugins"`. This must be + an absolute path. + - `host_network` ([host_network](#host_network-block): nil) - Registers additional host networks with the node that can be selected when port mapping.