dynamic host volumes: plugin spec tweaks (#24848)

* prefix plugin env vars with DHV_
* add env: DHV_VOLUME_ID, DHV_PLUGIN_DIR
* 5s timeout on fingerprint calls
This commit is contained in:
Daniel Bennett
2025-01-13 15:18:10 -05:00
committed by GitHub
parent 203a6533bb
commit 985eb53c65
8 changed files with 144 additions and 88 deletions

View File

@@ -87,9 +87,9 @@ func GetHostVolumePluginVersions(log hclog.Logger, pluginDir string) (map[string
mut := sync.Mutex{}
var wg sync.WaitGroup
for file, fullPath := range files {
for file := range files {
wg.Add(1)
go func(file, fullPath string) {
go func(file string) {
defer wg.Done()
// really should take way less than a second
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
@@ -97,7 +97,7 @@ func GetHostVolumePluginVersions(log hclog.Logger, pluginDir string) (map[string
log := log.With("plugin_id", file)
p, err := hvm.NewHostVolumePluginExternal(log, file, fullPath, "")
p, err := hvm.NewHostVolumePluginExternal(log, pluginDir, file, "")
if err != nil {
log.Warn("error getting plugin", "error", err)
return
@@ -112,7 +112,7 @@ func GetHostVolumePluginVersions(log hclog.Logger, pluginDir string) (map[string
mut.Lock()
plugins[file] = fprint.Version.String()
mut.Unlock()
}(file, fullPath)
}(file)
}
wg.Wait()

View File

@@ -12,6 +12,7 @@ import (
"os"
"os/exec"
"path/filepath"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
@@ -20,6 +21,19 @@ import (
"github.com/hashicorp/nomad/helper"
)
const (
// environment variables for external plugins
EnvOperation = "DHV_OPERATION"
EnvHostPath = "DHV_HOST_PATH"
EnvNodeID = "DHV_NODE_ID"
EnvVolumeName = "DHV_VOLUME_NAME"
EnvVolumeID = "DHV_VOLUME_ID"
EnvCapacityMin = "DHV_CAPACITY_MIN_BYTES"
EnvCapacityMax = "DHV_CAPACITY_MAX_BYTES"
EnvPluginDir = "DHV_PLUGIN_DIR"
EnvParameters = "DHV_PARAMETERS"
)
// HostVolumePlugin manages the lifecycle of volumes.
type HostVolumePlugin interface {
Fingerprint(ctx context.Context) (*PluginFingerprint, error)
@@ -121,24 +135,26 @@ var _ HostVolumePlugin = &HostVolumePluginExternal{}
// NewHostVolumePluginExternal returns an external host volume plugin
// if the specified executable exists on disk.
func NewHostVolumePluginExternal(log hclog.Logger,
id, executable, targetPath string) (*HostVolumePluginExternal, error) {
pluginDir, filename, targetPath string) (*HostVolumePluginExternal, error) {
// this should only be called with already-detected executables,
// but we'll double-check it anyway, so we can provide a tidy error message
// if it has changed between fingerprinting and execution.
executable := filepath.Join(pluginDir, filename)
f, err := os.Stat(executable)
if err != nil {
if os.IsNotExist(err) {
return nil, fmt.Errorf("%w: %q", ErrPluginNotExists, id)
return nil, fmt.Errorf("%w: %q", ErrPluginNotExists, filename)
}
return nil, err
}
if !helper.IsExecutable(f) {
return nil, fmt.Errorf("%w: %q", ErrPluginNotExecutable, id)
return nil, fmt.Errorf("%w: %q", ErrPluginNotExecutable, filename)
}
return &HostVolumePluginExternal{
ID: id,
ID: filename,
Executable: executable,
TargetPath: targetPath,
PluginDir: pluginDir,
log: log,
}, nil
}
@@ -151,6 +167,7 @@ type HostVolumePluginExternal struct {
ID string
Executable string
TargetPath string
PluginDir string
log hclog.Logger
}
@@ -158,15 +175,19 @@ type HostVolumePluginExternal struct {
// Fingerprint calls the executable with the following parameters:
// arguments: fingerprint
// environment:
// OPERATION=fingerprint
// DHV_OPERATION=fingerprint
//
// Response should be valid JSON on stdout, with a "version" key, e.g.:
// {"version": "0.0.1"}
// The version value should be a valid version number as allowed by
// version.NewVersion()
//
// Must complete within 5 seconds
func (p *HostVolumePluginExternal) Fingerprint(ctx context.Context) (*PluginFingerprint, error) {
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
cmd := exec.CommandContext(ctx, p.Executable, "fingerprint")
cmd.Env = []string{"OPERATION=fingerprint"}
cmd.Env = []string{EnvOperation + "=fingerprint"}
stdout, stderr, err := runCommand(cmd)
if err != nil {
p.log.Debug("error with plugin",
@@ -186,19 +207,23 @@ func (p *HostVolumePluginExternal) Fingerprint(ctx context.Context) (*PluginFing
// Create calls the executable with the following parameters:
// arguments: create {path to create}
// environment:
// OPERATION=create
// HOST_PATH={path to create}
// NODE_ID={Nomad node ID}
// VOLUME_NAME={name from the volume specification}
// CAPACITY_MIN_BYTES={capacity_min from the volume spec}
// CAPACITY_MAX_BYTES={capacity_max from the volume spec}
// PARAMETERS={json of parameters from the volume spec}
// DHV_OPERATION=create
// DHV_HOST_PATH={path to create}
// DHV_NODE_ID={Nomad node ID}
// DHV_VOLUME_NAME={name from the volume specification}
// DHV_VOLUME_ID={Nomad volume ID}
// DHV_CAPACITY_MIN_BYTES={capacity_min from the volume spec}
// DHV_CAPACITY_MAX_BYTES={capacity_max from the volume spec}
// DHV_PARAMETERS={json of parameters from the volume spec}
// DHV_PLUGIN_DIR={path to directory containing plugins}
//
// Response should be valid JSON on stdout with "path" and "bytes", e.g.:
// {"path": $HOST_PATH, "bytes": 50000000}
// "path" must be provided to confirm that the requested path is what was
// created by the plugin. "bytes" is the actual size of the volume created
// by the plugin; if excluded, it will default to 0.
//
// Must complete within 60 seconds (timeout on RPC)
func (p *HostVolumePluginExternal) Create(ctx context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) {
@@ -208,11 +233,11 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context,
return nil, fmt.Errorf("error marshaling volume pramaters: %w", err)
}
envVars := []string{
"NODE_ID=" + req.NodeID,
"VOLUME_NAME=" + req.Name,
fmt.Sprintf("CAPACITY_MIN_BYTES=%d", req.RequestedCapacityMinBytes),
fmt.Sprintf("CAPACITY_MAX_BYTES=%d", req.RequestedCapacityMaxBytes),
"PARAMETERS=" + string(params),
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
fmt.Sprintf("%s=%s", EnvVolumeName, req.Name),
fmt.Sprintf("%s=%d", EnvCapacityMin, req.RequestedCapacityMinBytes),
fmt.Sprintf("%s=%d", EnvCapacityMax, req.RequestedCapacityMaxBytes),
fmt.Sprintf("%s=%s", EnvParameters, params),
}
stdout, _, err := p.runPlugin(ctx, "create", req.ID, envVars)
@@ -228,19 +253,24 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context,
// an error here after the plugin has done who-knows-what.
return nil, err
}
// TODO: validate returned host path
return &pluginResp, nil
}
// Delete calls the executable with the following parameters:
// arguments: delete {path to create}
// environment:
// OPERATION=delete
// HOST_PATH={path to create}
// NODE_ID={Nomad node ID}
// VOLUME_NAME={name from the volume specification}
// PARAMETERS={json of parameters from the volume spec}
// DHV_OPERATION=delete
// DHV_HOST_PATH={path to create}
// DHV_NODE_ID={Nomad node ID}
// DHV_VOLUME_NAME={name from the volume specification}
// DHV_VOLUME_ID={Nomad volume ID}
// DHV_PARAMETERS={json of parameters from the volume spec}
// DHV_PLUGIN_DIR={path to directory containing plugins}
//
// Response on stdout is discarded.
//
// Must complete within 60 seconds (timeout on RPC)
func (p *HostVolumePluginExternal) Delete(ctx context.Context,
req *cstructs.ClientHostVolumeDeleteRequest) error {
@@ -250,9 +280,9 @@ func (p *HostVolumePluginExternal) Delete(ctx context.Context,
return fmt.Errorf("error marshaling volume pramaters: %w", err)
}
envVars := []string{
"NODE_ID=" + req.NodeID,
"VOLUME_NAME=" + req.Name,
"PARAMETERS=" + string(params),
fmt.Sprintf("%s=%s", EnvNodeID, req.NodeID),
fmt.Sprintf("%s=%s", EnvVolumeName, req.Name),
fmt.Sprintf("%s=%s", EnvParameters, params),
}
_, _, err = p.runPlugin(ctx, "delete", req.ID, envVars)
@@ -263,8 +293,10 @@ func (p *HostVolumePluginExternal) Delete(ctx context.Context,
}
// runPlugin executes the... executable with these additional env vars:
// OPERATION={op}
// HOST_PATH={p.TargetPath/volID}
// DHV_OPERATION={op}
// DHV_HOST_PATH={path to create}
// DHV_VOLUME_ID={Nomad volume ID}
// DHV_PLUGIN_DIR={path to directory containing plugins}
func (p *HostVolumePluginExternal) runPlugin(ctx context.Context,
op, volID string, env []string) (stdout, stderr []byte, err error) {
@@ -279,8 +311,10 @@ func (p *HostVolumePluginExternal) runPlugin(ctx context.Context,
cmd := exec.CommandContext(ctx, p.Executable, op, path)
cmd.Env = append([]string{
"OPERATION=" + op,
"HOST_PATH=" + path,
fmt.Sprintf("%s=%s", EnvOperation, op),
fmt.Sprintf("%s=%s", EnvHostPath, path),
fmt.Sprintf("%s=%s", EnvVolumeID, volID),
fmt.Sprintf("%s=%s", EnvPluginDir, p.PluginDir),
}, env...)
stdout, stderr, err = runCommand(cmd)

View File

@@ -79,22 +79,23 @@ func TestNewHostVolumePluginExternal(t *testing.T) {
log := testlog.HCLogger(t)
var err error
_, err = NewHostVolumePluginExternal(log, "test-id", "non-existent", "target")
_, err = NewHostVolumePluginExternal(log, ".", "non-existent", "target")
must.ErrorIs(t, err, ErrPluginNotExists)
_, err = NewHostVolumePluginExternal(log, "test-id", "host_volume_plugin_test.go", "target")
_, err = NewHostVolumePluginExternal(log, ".", "host_volume_plugin_test.go", "target")
must.ErrorIs(t, err, ErrPluginNotExecutable)
t.Run("unix", func(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("skipped because windows") // db TODO(1.10.0)
}
p, err := NewHostVolumePluginExternal(log, "test-id", "./test_fixtures/test_plugin.sh", "test-target")
p, err := NewHostVolumePluginExternal(log, "./test_fixtures", "test_plugin.sh", "test-target")
must.NoError(t, err)
must.Eq(t, &HostVolumePluginExternal{
ID: "test-id",
Executable: "./test_fixtures/test_plugin.sh",
ID: "test_plugin.sh",
Executable: "test_fixtures/test_plugin.sh",
TargetPath: "test-target",
PluginDir: "./test_fixtures",
log: log,
}, p)
})
@@ -115,49 +116,47 @@ func TestHostVolumePluginExternal(t *testing.T) {
t.Run("happy", func(t *testing.T) {
log, getLogs := logRecorder(t)
plug := &HostVolumePluginExternal{
ID: "test-external-plugin",
Executable: "./test_fixtures/test_plugin.sh",
TargetPath: tmp,
log: log,
}
v, err := plug.Fingerprint(timeout(t))
plug, err := NewHostVolumePluginExternal(log, "./test_fixtures", "test_plugin.sh", tmp)
must.NoError(t, err)
must.Eq(t, expectVersion, v.Version)
// fingerprint
v, err := plug.Fingerprint(timeout(t))
logged := getLogs()
must.NoError(t, err, must.Sprintf("logs: %s", logged))
must.Eq(t, expectVersion, v.Version, must.Sprintf("logs: %s", logged))
// create
resp, err := plug.Create(timeout(t),
&cstructs.ClientHostVolumeCreateRequest{
Name: "test-vol-name",
ID: volID,
NodeID: "test-node",
RequestedCapacityMinBytes: 5,
RequestedCapacityMaxBytes: 10,
Parameters: map[string]string{"key": "val"},
})
must.NoError(t, err)
logged = getLogs()
must.NoError(t, err, must.Sprintf("logs: %s", logged))
must.Eq(t, &HostVolumePluginCreateResponse{
Path: target,
SizeBytes: 5,
}, resp)
must.DirExists(t, target)
logged := getLogs()
must.StrContains(t, logged, "OPERATION=create") // stderr from `env`
must.StrContains(t, logged, `stdout="{`) // stdout from printf
// reset logger for next call
log, getLogs = logRecorder(t)
plug.log = log
// delete
err = plug.Delete(timeout(t),
&cstructs.ClientHostVolumeDeleteRequest{
Name: "test-vol-name",
ID: volID,
NodeID: "test-node",
Parameters: map[string]string{"key": "val"},
})
must.NoError(t, err)
must.DirNotExists(t, target)
logged = getLogs()
must.NoError(t, err, must.Sprintf("logs: %s", logged))
must.DirNotExists(t, target)
must.StrContains(t, logged, "OPERATION=delete") // stderr from `env`
must.StrContains(t, logged, "removed directory") // stdout from `rm -v`
})
@@ -165,15 +164,11 @@ func TestHostVolumePluginExternal(t *testing.T) {
t.Run("sad", func(t *testing.T) {
log, getLogs := logRecorder(t)
plug := &HostVolumePluginExternal{
ID: "test-external-plugin-sad",
Executable: "./test_fixtures/test_plugin_sad.sh",
TargetPath: tmp,
log: log,
}
plug, err := NewHostVolumePluginExternal(log, "./test_fixtures", "test_plugin_sad.sh", tmp)
must.NoError(t, err)
v, err := plug.Fingerprint(timeout(t))
must.EqError(t, err, `error getting version from plugin "test-external-plugin-sad": exit status 1`)
must.EqError(t, err, `error getting version from plugin "test_plugin_sad.sh": exit status 1`)
must.Nil(t, v)
logged := getLogs()
must.StrContains(t, logged, "fingerprint: sad plugin is sad")
@@ -191,7 +186,7 @@ func TestHostVolumePluginExternal(t *testing.T) {
RequestedCapacityMaxBytes: 10,
Parameters: map[string]string{"key": "val"},
})
must.EqError(t, err, `error creating volume "test-vol-id" with plugin "test-external-plugin-sad": exit status 1`)
must.EqError(t, err, `error creating volume "test-vol-id" with plugin "test_plugin_sad.sh": exit status 1`)
must.Nil(t, resp)
logged = getLogs()
must.StrContains(t, logged, "create: sad plugin is sad")
@@ -206,7 +201,7 @@ func TestHostVolumePluginExternal(t *testing.T) {
NodeID: "test-node",
Parameters: map[string]string{"key": "val"},
})
must.EqError(t, err, `error deleting volume "test-vol-id" with plugin "test-external-plugin-sad": exit status 1`)
must.EqError(t, err, `error deleting volume "test-vol-id" with plugin "test_plugin_sad.sh": exit status 1`)
logged = getLogs()
must.StrContains(t, logged, "delete: sad plugin is sad")
must.StrContains(t, logged, "delete: it tells you all about it in stderr")

View File

@@ -8,7 +8,6 @@ import (
"errors"
"fmt"
"os"
"path/filepath"
"sync"
"github.com/hashicorp/go-hclog"
@@ -217,8 +216,7 @@ func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) {
return plug, nil
}
log := hvm.log.With("plugin_id", id)
path := filepath.Join(hvm.pluginDir, id)
return NewHostVolumePluginExternal(log, id, path, hvm.sharedMountDir)
return NewHostVolumePluginExternal(log, hvm.pluginDir, id, hvm.sharedMountDir)
}
// restoreFromState loads all volumes from client state and runs Create for

View File

@@ -426,6 +426,7 @@ func logRecorder(t *testing.T) (hclog.Logger, func() string) {
return logger, func() string {
bts, err := io.ReadAll(buf)
test.NoError(t, err)
buf.Reset()
return string(bts)
}
}

View File

@@ -7,26 +7,44 @@ set -xeuo pipefail
env 1>&2
test "$1" == "$OPERATION"
test "$1" == "$DHV_OPERATION"
echo 'all operations should ignore stderr' 1>&2
# since we will be running `rm -rf` (frightening),
# check to make sure DHV_HOST_PATH has a uuid shape in it.
# Nomad generates a volume ID and includes it in the path.
validate_path() {
if [[ ! "$DHV_HOST_PATH" =~ "test-vol-id" ]]; then
1>&2 echo "expected test vol ID in the DHV_HOST_PATH; got: '$DHV_HOST_PATH'"
exit 1
fi
}
case $1 in
fingerprint)
echo '{"version": "0.0.2"}' ;;
create)
test "$2" == "$HOST_PATH"
test "$NODE_ID" == 'test-node'
test "$PARAMETERS" == '{"key":"val"}'
test "$CAPACITY_MIN_BYTES" -eq 5
test "$CAPACITY_MAX_BYTES" -eq 10
test "$2" == "$DHV_HOST_PATH"
test "$DHV_NODE_ID" == 'test-node'
test "$DHV_VOLUME_NAME" == 'test-vol-name'
test "$DHV_VOLUME_ID" == 'test-vol-id'
test "$DHV_PARAMETERS" == '{"key":"val"}'
test "$DHV_CAPACITY_MIN_BYTES" -eq 5
test "$DHV_CAPACITY_MAX_BYTES" -eq 10
test "$DHV_PLUGIN_DIR" == './test_fixtures'
validate_path "$DHV_HOST_PATH"
mkdir "$2"
printf '{"path": "%s", "bytes": 5, "context": %s}' "$2" "$PARAMETERS"
printf '{"path": "%s", "bytes": 5, "context": %s}' "$2" "$DHV_PARAMETERS"
;;
delete)
test "$2" == "$HOST_PATH"
test "$NODE_ID" == 'test-node'
test "$PARAMETERS" == '{"key":"val"}'
test "$2" == "$DHV_HOST_PATH"
test "$DHV_NODE_ID" == 'test-node'
test "$DHV_VOLUME_NAME" == 'test-vol-name'
test "$DHV_VOLUME_ID" == 'test-vol-id'
test "$DHV_PARAMETERS" == '{"key":"val"}'
test "$DHV_PLUGIN_DIR" == './test_fixtures'
validate_path "$DHV_HOST_PATH"
rm -rfv "$2" ;;
*)
echo "unknown operation $1"

View File

@@ -42,26 +42,37 @@ plugin="$1"
op="$2"
alloc_mounts="${3:-/tmp}"
uuid="${4:-74564d17-ce50-0bc1-48e5-6feaa41ede48}"
node_id='0b62d807-6101-a80f-374d-e1c430abbf47'
plugin_dir="$(dirname "$plugin")"
case $op in
fingerprint)
args='fingerprint'
export DHV_OPERATION='fingerprint'
;;
create)
args="create $alloc_mounts/$uuid"
export HOST_PATH="$alloc_mounts/$uuid"
export VOLUME_NAME=test
export NODE_ID=0b62d807-6101-a80f-374d-e1c430abbf47
export CAPACITY_MAX_BYTES=50000000 # 50mb
export CAPACITY_MIN_BYTES=50000000 # 50mb
export PARAMETERS='{"a": "ayy"}'
export DHV_OPERATION='create'
export DHV_HOST_PATH="$alloc_mounts/$uuid"
export DHV_VOLUME_NAME=test
export DHV_VOLUME_ID="$uuid"
export DHV_NODE_ID="$node_id"
export DHV_CAPACITY_MAX_BYTES=50000000 # 50mb
export DHV_CAPACITY_MIN_BYTES=50000000 # 50mb
export DHV_PARAMETERS='{"a": "ayy"}'
export DHV_PLUGIN_DIR="$plugin_dir"
;;
delete)
args="delete $alloc_mounts/$uuid"
export HOST_PATH="$alloc_mounts/$uuid"
export PARAMETERS='{"a": "ayy"}'
export DHV_OPERATION='delete'
export DHV_HOST_PATH="$alloc_mounts/$uuid"
export DHV_NODE_ID="$node_id"
export DHV_VOLUME_NAME=test
export DHV_VOLUME_ID="$uuid"
export DHV_PARAMETERS='{"a": "ayy"}'
export DHV_PLUGIN_DIR="$plugin_dir"
;;
*)
@@ -69,6 +80,5 @@ case $op in
;;
esac
export OPERATION="$op"
set -x
eval "$plugin $args"

View File

@@ -120,7 +120,7 @@ delete_volume() {
case "$1" in
"create")
create_volume "$host_path" "$CAPACITY_MIN_BYTES"
create_volume "$host_path" "$DHV_CAPACITY_MIN_BYTES"
# output what Nomad expects
bytes="$(st "$host_path".$ext)"
printf '{"path": "%s", "bytes": %s}' "$host_path" "$bytes"