mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
dynamic host volumes: client-side tests, comments, tidying (#24747)
This commit is contained in:
@@ -20,17 +20,23 @@ import (
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
)
|
||||
|
||||
type PluginFingerprint struct {
|
||||
Version *version.Version `json:"version"`
|
||||
}
|
||||
|
||||
// HostVolumePlugin manages the lifecycle of volumes.
|
||||
type HostVolumePlugin interface {
|
||||
Fingerprint(ctx context.Context) (*PluginFingerprint, error)
|
||||
Create(ctx context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error)
|
||||
Delete(ctx context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error
|
||||
// db TODO(1.10.0): update? resize? ??
|
||||
}
|
||||
|
||||
// PluginFingerprint gets set on the node for volume scheduling.
|
||||
// Plugins are expected to respond to 'fingerprint' calls with json that
|
||||
// unmarshals to this struct.
|
||||
type PluginFingerprint struct {
|
||||
Version *version.Version `json:"version"`
|
||||
}
|
||||
|
||||
// HostVolumePluginCreateResponse gets stored on the volume in server state.
|
||||
// Plugins are expected to respond to 'create' calls with json that
|
||||
// unmarshals to this struct.
|
||||
type HostVolumePluginCreateResponse struct {
|
||||
Path string `json:"path"`
|
||||
SizeBytes int64 `json:"bytes"`
|
||||
@@ -41,6 +47,8 @@ const HostVolumePluginMkdirVersion = "0.0.1"
|
||||
|
||||
var _ HostVolumePlugin = &HostVolumePluginMkdir{}
|
||||
|
||||
// HostVolumePluginMkdir is a plugin that creates a directory within the
|
||||
// specified TargetPath. It is built-in to Nomad, so is always available.
|
||||
type HostVolumePluginMkdir struct {
|
||||
ID string
|
||||
TargetPath string
|
||||
@@ -66,7 +74,8 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context,
|
||||
log.Debug("running plugin")
|
||||
|
||||
resp := &HostVolumePluginCreateResponse{
|
||||
Path: path,
|
||||
Path: path,
|
||||
// "mkdir" volumes, being simple directories, have unrestricted size
|
||||
SizeBytes: 0,
|
||||
}
|
||||
|
||||
@@ -109,6 +118,8 @@ func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHo
|
||||
|
||||
var _ HostVolumePlugin = &HostVolumePluginExternal{}
|
||||
|
||||
// NewHostVolumePluginExternal returns an external host volume plugin
|
||||
// if the specified executable exists on disk.
|
||||
func NewHostVolumePluginExternal(log hclog.Logger,
|
||||
id, executable, targetPath string) (*HostVolumePluginExternal, error) {
|
||||
// this should only be called with already-detected executables,
|
||||
@@ -132,6 +143,10 @@ func NewHostVolumePluginExternal(log hclog.Logger,
|
||||
}, nil
|
||||
}
|
||||
|
||||
// HostVolumePluginExternal calls an executable on disk. All operations
|
||||
// *must* be idempotent, and safe to be called concurrently per volume.
|
||||
// For each call, the executable's stdout and stderr may be logged, so plugin
|
||||
// authors should not include any sensitive information in their plugin outputs.
|
||||
type HostVolumePluginExternal struct {
|
||||
ID string
|
||||
Executable string
|
||||
@@ -140,6 +155,15 @@ type HostVolumePluginExternal struct {
|
||||
log hclog.Logger
|
||||
}
|
||||
|
||||
// Fingerprint calls the executable with the following parameters:
|
||||
// arguments: fingerprint
|
||||
// environment:
|
||||
// OPERATION=fingerprint
|
||||
//
|
||||
// Response should be valid JSON on stdout, with a "version" key, e.g.:
|
||||
// {"version": "0.0.1"}
|
||||
// The version value should be a valid version number as allowed by
|
||||
// version.NewVersion()
|
||||
func (p *HostVolumePluginExternal) Fingerprint(ctx context.Context) (*PluginFingerprint, error) {
|
||||
cmd := exec.CommandContext(ctx, p.Executable, "fingerprint")
|
||||
cmd.Env = []string{"OPERATION=fingerprint"}
|
||||
@@ -159,12 +183,28 @@ func (p *HostVolumePluginExternal) Fingerprint(ctx context.Context) (*PluginFing
|
||||
return fprint, nil
|
||||
}
|
||||
|
||||
// Create calls the executable with the following parameters:
|
||||
// arguments: create {path to create}
|
||||
// environment:
|
||||
// OPERATION=create
|
||||
// HOST_PATH={path to create}
|
||||
// NODE_ID={Nomad node ID}
|
||||
// VOLUME_NAME={name from the volume specification}
|
||||
// CAPACITY_MIN_BYTES={capacity_min from the volume spec}
|
||||
// CAPACITY_MAX_BYTES={capacity_max from the volume spec}
|
||||
// PARAMETERS={json of parameters from the volume spec}
|
||||
//
|
||||
// Response should be valid JSON on stdout with "path" and "bytes", e.g.:
|
||||
// {"path": $HOST_PATH, "bytes": 50000000}
|
||||
// "path" must be provided to confirm that the requested path is what was
|
||||
// created by the plugin. "bytes" is the actual size of the volume created
|
||||
// by the plugin; if excluded, it will default to 0.
|
||||
func (p *HostVolumePluginExternal) Create(ctx context.Context,
|
||||
req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) {
|
||||
|
||||
params, err := json.Marshal(req.Parameters) // db TODO(1.10.0): document if this is nil, then PARAMETERS env will be "null"
|
||||
params, err := json.Marshal(req.Parameters)
|
||||
if err != nil {
|
||||
// this is a proper error, because users can set this in the volume spec
|
||||
// should never happen; req.Parameters is a simple map[string]string
|
||||
return nil, fmt.Errorf("error marshaling volume pramaters: %w", err)
|
||||
}
|
||||
envVars := []string{
|
||||
@@ -181,22 +221,37 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context,
|
||||
}
|
||||
|
||||
var pluginResp HostVolumePluginCreateResponse
|
||||
err = json.Unmarshal(stdout, &pluginResp) // db TODO(1.10.0): if this fails, then the volume may have been created, according to the plugin, but Nomad will not save it
|
||||
err = json.Unmarshal(stdout, &pluginResp)
|
||||
if err != nil {
|
||||
// note: if a plugin does not return valid json, a volume may be
|
||||
// created without any respective state in Nomad, since we return
|
||||
// an error here after the plugin has done who-knows-what.
|
||||
return nil, err
|
||||
}
|
||||
return &pluginResp, nil
|
||||
}
|
||||
|
||||
// Delete calls the executable with the following parameters:
|
||||
// arguments: delete {path to create}
|
||||
// environment:
|
||||
// OPERATION=delete
|
||||
// HOST_PATH={path to create}
|
||||
// NODE_ID={Nomad node ID}
|
||||
// VOLUME_NAME={name from the volume specification}
|
||||
// PARAMETERS={json of parameters from the volume spec}
|
||||
//
|
||||
// Response on stdout is discarded.
|
||||
func (p *HostVolumePluginExternal) Delete(ctx context.Context,
|
||||
req *cstructs.ClientHostVolumeDeleteRequest) error {
|
||||
|
||||
params, err := json.Marshal(req.Parameters)
|
||||
if err != nil {
|
||||
// should never happen; req.Parameters is a simple map[string]string
|
||||
return fmt.Errorf("error marshaling volume pramaters: %w", err)
|
||||
}
|
||||
envVars := []string{
|
||||
"NODE_ID=" + req.NodeID,
|
||||
"VOLUME_NAME=" + req.Name,
|
||||
"PARAMETERS=" + string(params),
|
||||
}
|
||||
|
||||
@@ -207,6 +262,9 @@ func (p *HostVolumePluginExternal) Delete(ctx context.Context,
|
||||
return nil
|
||||
}
|
||||
|
||||
// runPlugin executes the... executable with these additional env vars:
|
||||
// OPERATION={op}
|
||||
// HOST_PATH={p.TargetPath/volID}
|
||||
func (p *HostVolumePluginExternal) runPlugin(ctx context.Context,
|
||||
op, volID string, env []string) (stdout, stderr []byte, err error) {
|
||||
|
||||
@@ -239,6 +297,7 @@ func (p *HostVolumePluginExternal) runPlugin(ctx context.Context,
|
||||
return stdout, stderr, nil
|
||||
}
|
||||
|
||||
// runCommand executes the provided Cmd and captures stdout and stderr.
|
||||
func runCommand(cmd *exec.Cmd) (stdout, stderr []byte, err error) {
|
||||
var errBuf bytes.Buffer
|
||||
cmd.Stderr = io.Writer(&errBuf)
|
||||
|
||||
@@ -4,19 +4,13 @@
|
||||
package hostvolumemanager
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"runtime"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-version"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/shoenig/test"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
@@ -37,23 +31,29 @@ func TestHostVolumePluginMkdir(t *testing.T) {
|
||||
must.NoError(t, err)
|
||||
|
||||
t.Run("happy", func(t *testing.T) {
|
||||
resp, err := plug.Create(timeout(t),
|
||||
&cstructs.ClientHostVolumeCreateRequest{
|
||||
ID: volID, // minimum required by this plugin
|
||||
})
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, &HostVolumePluginCreateResponse{
|
||||
Path: target,
|
||||
SizeBytes: 0,
|
||||
}, resp)
|
||||
must.DirExists(t, target)
|
||||
// run multiple times, should be idempotent
|
||||
for range 2 {
|
||||
resp, err := plug.Create(timeout(t),
|
||||
&cstructs.ClientHostVolumeCreateRequest{
|
||||
ID: volID, // minimum required by this plugin
|
||||
})
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, &HostVolumePluginCreateResponse{
|
||||
Path: target,
|
||||
SizeBytes: 0,
|
||||
}, resp)
|
||||
must.DirExists(t, target)
|
||||
}
|
||||
|
||||
err = plug.Delete(timeout(t),
|
||||
&cstructs.ClientHostVolumeDeleteRequest{
|
||||
ID: volID,
|
||||
})
|
||||
must.NoError(t, err)
|
||||
must.DirNotExists(t, target)
|
||||
// delete should be idempotent, too
|
||||
for range 2 {
|
||||
err = plug.Delete(timeout(t),
|
||||
&cstructs.ClientHostVolumeDeleteRequest{
|
||||
ID: volID,
|
||||
})
|
||||
must.NoError(t, err)
|
||||
must.DirNotExists(t, target)
|
||||
}
|
||||
})
|
||||
|
||||
t.Run("sad", func(t *testing.T) {
|
||||
@@ -75,6 +75,31 @@ func TestHostVolumePluginMkdir(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestNewHostVolumePluginExternal(t *testing.T) {
|
||||
log := testlog.HCLogger(t)
|
||||
var err error
|
||||
|
||||
_, err = NewHostVolumePluginExternal(log, "test-id", "non-existent", "target")
|
||||
must.ErrorIs(t, err, ErrPluginNotExists)
|
||||
|
||||
_, err = NewHostVolumePluginExternal(log, "test-id", "host_volume_plugin_test.go", "target")
|
||||
must.ErrorIs(t, err, ErrPluginNotExecutable)
|
||||
|
||||
t.Run("unix", func(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("skipped because windows") // db TODO(1.10.0)
|
||||
}
|
||||
p, err := NewHostVolumePluginExternal(log, "test-id", "./test_fixtures/test_plugin.sh", "test-target")
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, &HostVolumePluginExternal{
|
||||
ID: "test-id",
|
||||
Executable: "./test_fixtures/test_plugin.sh",
|
||||
TargetPath: "test-target",
|
||||
log: log,
|
||||
}, p)
|
||||
})
|
||||
}
|
||||
|
||||
func TestHostVolumePluginExternal(t *testing.T) {
|
||||
if runtime.GOOS == "windows" {
|
||||
t.Skip("skipped because windows") // db TODO(1.10.0)
|
||||
@@ -187,29 +212,3 @@ func TestHostVolumePluginExternal(t *testing.T) {
|
||||
must.StrContains(t, logged, "delete: it tells you all about it in stderr")
|
||||
})
|
||||
}
|
||||
|
||||
// timeout provides a context that times out in 1 second
|
||||
func timeout(t *testing.T) context.Context {
|
||||
t.Helper()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
t.Cleanup(cancel)
|
||||
return ctx
|
||||
}
|
||||
|
||||
// logRecorder is here so we can assert that stdout/stderr appear in logs
|
||||
func logRecorder(t *testing.T) (hclog.Logger, func() string) {
|
||||
t.Helper()
|
||||
buf := &bytes.Buffer{}
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Name: "log-recorder",
|
||||
Output: buf,
|
||||
Level: hclog.Debug,
|
||||
IncludeLocation: true,
|
||||
DisableTime: true,
|
||||
})
|
||||
return logger, func() string {
|
||||
bts, err := io.ReadAll(buf)
|
||||
test.NoError(t, err)
|
||||
return string(bts)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -21,12 +21,14 @@ var (
|
||||
ErrPluginNotExecutable = errors.New("plugin not executable")
|
||||
)
|
||||
|
||||
// HostVolumeStateManager manages the lifecycle of volumes in client state.
|
||||
type HostVolumeStateManager interface {
|
||||
PutDynamicHostVolume(*cstructs.HostVolumeState) error
|
||||
GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error)
|
||||
DeleteDynamicHostVolume(string) error
|
||||
}
|
||||
|
||||
// Config is used to configure a HostVolumeManager.
|
||||
type Config struct {
|
||||
// PluginDir is where external plugins may be found.
|
||||
PluginDir string
|
||||
@@ -43,97 +45,38 @@ type Config struct {
|
||||
UpdateNodeVols HostVolumeNodeUpdater
|
||||
}
|
||||
|
||||
// HostVolumeManager executes plugins, manages volume metadata in client state,
|
||||
// and registers volumes with the client node.
|
||||
type HostVolumeManager struct {
|
||||
pluginDir string
|
||||
sharedMountDir string
|
||||
stateMgr HostVolumeStateManager
|
||||
updateNodeVols HostVolumeNodeUpdater
|
||||
builtIns map[string]HostVolumePlugin
|
||||
log hclog.Logger
|
||||
}
|
||||
|
||||
// NewHostVolumeManager includes default builtin plugins.
|
||||
func NewHostVolumeManager(logger hclog.Logger, config Config) *HostVolumeManager {
|
||||
// db TODO(1.10.0): document plugin config options
|
||||
logger = logger.Named("host_volume_manager")
|
||||
return &HostVolumeManager{
|
||||
pluginDir: config.PluginDir,
|
||||
sharedMountDir: config.SharedMountDir,
|
||||
stateMgr: config.StateMgr,
|
||||
updateNodeVols: config.UpdateNodeVols,
|
||||
log: logger.Named("host_volume_manager"),
|
||||
builtIns: map[string]HostVolumePlugin{
|
||||
HostVolumePluginMkdirID: &HostVolumePluginMkdir{
|
||||
ID: HostVolumePluginMkdirID,
|
||||
TargetPath: config.SharedMountDir,
|
||||
log: logger.With("plugin_id", HostVolumePluginMkdirID),
|
||||
},
|
||||
},
|
||||
log: logger,
|
||||
}
|
||||
}
|
||||
|
||||
func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, resp *HostVolumePluginCreateResponse) *structs.ClientHostVolumeConfig {
|
||||
if req == nil || resp == nil {
|
||||
return nil
|
||||
}
|
||||
return &structs.ClientHostVolumeConfig{
|
||||
Name: req.Name,
|
||||
ID: req.ID,
|
||||
Path: resp.Path,
|
||||
|
||||
// dynamic volumes, like CSI, have more robust `capabilities`,
|
||||
// so we always set ReadOnly to false, and let the scheduler
|
||||
// decide when to ignore this and check capabilities instead.
|
||||
ReadOnly: false,
|
||||
}
|
||||
}
|
||||
|
||||
func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap, error) {
|
||||
vols, err := hvm.stateMgr.GetDynamicHostVolumes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
volumes := make(VolumeMap)
|
||||
var mut sync.Mutex
|
||||
|
||||
if len(vols) == 0 {
|
||||
return volumes, nil // nothing to do
|
||||
}
|
||||
|
||||
// re-"create" the volumes - plugins have the best knowledge of their
|
||||
// side effects, and they must be idempotent.
|
||||
group := multierror.Group{}
|
||||
for _, vol := range vols {
|
||||
group.Go(func() error { // db TODO(1.10.0): document that plugins must be safe to run concurrently
|
||||
// missing plugins with associated volumes in state are considered
|
||||
// client-stopping errors. they need to be fixed by cluster admins.
|
||||
plug, err := hvm.getPlugin(vol.CreateReq.PluginID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := plug.Create(ctx, vol.CreateReq)
|
||||
if err != nil {
|
||||
// plugin execution errors are only logged
|
||||
hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err)
|
||||
return nil
|
||||
}
|
||||
mut.Lock()
|
||||
volumes[vol.CreateReq.Name] = genVolConfig(vol.CreateReq, resp)
|
||||
mut.Unlock()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
mErr := group.Wait()
|
||||
return volumes, helper.FlattenMultierror(mErr.ErrorOrNil())
|
||||
}
|
||||
|
||||
func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) {
|
||||
log := hvm.log.With("plugin_id", id)
|
||||
|
||||
if id == HostVolumePluginMkdirID {
|
||||
return &HostVolumePluginMkdir{
|
||||
ID: HostVolumePluginMkdirID,
|
||||
TargetPath: hvm.sharedMountDir,
|
||||
log: log,
|
||||
}, nil
|
||||
}
|
||||
|
||||
path := filepath.Join(hvm.pluginDir, id)
|
||||
return NewHostVolumePluginExternal(log, id, path, hvm.sharedMountDir)
|
||||
}
|
||||
|
||||
// Create runs the appropriate plugin for the given request, saves the request
|
||||
// to state, and updates the node with the volume.
|
||||
func (hvm *HostVolumeManager) Create(ctx context.Context,
|
||||
req *cstructs.ClientHostVolumeCreateRequest) (*cstructs.ClientHostVolumeCreateResponse, error) {
|
||||
|
||||
@@ -181,6 +124,8 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// Delete runs the appropriate plugin for the given request, removes it from
|
||||
// state, and updates the node to remove the volume.
|
||||
func (hvm *HostVolumeManager) Delete(ctx context.Context,
|
||||
req *cstructs.ClientHostVolumeDeleteRequest) (*cstructs.ClientHostVolumeDeleteResponse, error) {
|
||||
|
||||
@@ -208,3 +153,71 @@ func (hvm *HostVolumeManager) Delete(ctx context.Context,
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
// getPlugin finds either a built-in plugin or an external plugin.
|
||||
func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) {
|
||||
if plug, ok := hvm.builtIns[id]; ok {
|
||||
return plug, nil
|
||||
}
|
||||
log := hvm.log.With("plugin_id", id)
|
||||
path := filepath.Join(hvm.pluginDir, id)
|
||||
return NewHostVolumePluginExternal(log, id, path, hvm.sharedMountDir)
|
||||
}
|
||||
|
||||
// restoreFromState loads all volumes from client state and runs Create for
|
||||
// each one, so volumes are restored upon agent restart or host reboot.
|
||||
func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap, error) {
|
||||
vols, err := hvm.stateMgr.GetDynamicHostVolumes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
volumes := make(VolumeMap)
|
||||
var mut sync.Mutex
|
||||
|
||||
if len(vols) == 0 {
|
||||
return volumes, nil // nothing to do
|
||||
}
|
||||
|
||||
// re-"create" the volumes - plugins have the best knowledge of their
|
||||
// side effects, and they must be idempotent.
|
||||
group := multierror.Group{}
|
||||
for _, vol := range vols {
|
||||
group.Go(func() error {
|
||||
// missing plugins with associated volumes in state are considered
|
||||
// client-stopping errors. they need to be fixed by cluster admins.
|
||||
plug, err := hvm.getPlugin(vol.CreateReq.PluginID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
resp, err := plug.Create(ctx, vol.CreateReq)
|
||||
if err != nil {
|
||||
// plugin execution errors are only logged
|
||||
hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err)
|
||||
return nil
|
||||
}
|
||||
mut.Lock()
|
||||
volumes[vol.CreateReq.Name] = genVolConfig(vol.CreateReq, resp)
|
||||
mut.Unlock()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
mErr := group.Wait()
|
||||
return volumes, helper.FlattenMultierror(mErr.ErrorOrNil())
|
||||
}
|
||||
|
||||
// genVolConfig generates the host volume config for the node to report as
|
||||
// available to the servers for job scheduling.
|
||||
func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, resp *HostVolumePluginCreateResponse) *structs.ClientHostVolumeConfig {
|
||||
return &structs.ClientHostVolumeConfig{
|
||||
Name: req.Name,
|
||||
ID: req.ID,
|
||||
Path: resp.Path,
|
||||
|
||||
// dynamic volumes, like CSI, have more robust `capabilities`,
|
||||
// so we always set ReadOnly to false, and let the scheduler
|
||||
// decide when to ignore this and check capabilities instead.
|
||||
ReadOnly: false,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,21 +4,184 @@
|
||||
package hostvolumemanager
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-version"
|
||||
cstate "github.com/hashicorp/nomad/client/state"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/shoenig/test"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
// db TODO(1.10.0): improve hostvolumemanager tests.
|
||||
func TestHostVolumeManager(t *testing.T) {
|
||||
log := testlog.HCLogger(t)
|
||||
tmp := t.TempDir()
|
||||
errDB := &cstate.ErrDB{}
|
||||
memDB := cstate.NewMemDB(log)
|
||||
node := newFakeNode()
|
||||
|
||||
func TestNewHostVolumeManager_restoreState(t *testing.T) {
|
||||
hvm := NewHostVolumeManager(log, Config{
|
||||
PluginDir: "./test_fixtures",
|
||||
SharedMountDir: tmp,
|
||||
StateMgr: errDB,
|
||||
UpdateNodeVols: node.updateVol,
|
||||
})
|
||||
|
||||
plug := &fakePlugin{mountDir: tmp}
|
||||
hvm.builtIns["test-plugin"] = plug
|
||||
|
||||
ctx := timeout(t)
|
||||
|
||||
t.Run("create", func(t *testing.T) {
|
||||
// plugin doesn't exist
|
||||
req := &cstructs.ClientHostVolumeCreateRequest{
|
||||
ID: "vol-id",
|
||||
Name: "vol-name",
|
||||
PluginID: "nope",
|
||||
|
||||
RequestedCapacityMinBytes: 5,
|
||||
}
|
||||
_, err := hvm.Create(ctx, req)
|
||||
must.ErrorIs(t, err, ErrPluginNotExists)
|
||||
|
||||
// error from plugin
|
||||
req.PluginID = "test-plugin"
|
||||
plug.createErr = errors.New("sad create")
|
||||
_, err = hvm.Create(ctx, req)
|
||||
must.ErrorIs(t, err, plug.createErr)
|
||||
plug.reset()
|
||||
|
||||
// error saving state, then error from cleanup attempt
|
||||
plug.deleteErr = errors.New("sad delete")
|
||||
_, err = hvm.Create(ctx, req)
|
||||
must.ErrorIs(t, err, cstate.ErrDBError)
|
||||
must.ErrorIs(t, err, plug.deleteErr)
|
||||
plug.reset()
|
||||
|
||||
// error saving state, successful cleanup
|
||||
_, err = hvm.Create(ctx, req)
|
||||
must.ErrorIs(t, err, cstate.ErrDBError)
|
||||
must.Eq(t, "vol-id", plug.deleted)
|
||||
plug.reset()
|
||||
|
||||
// happy path
|
||||
hvm.stateMgr = memDB
|
||||
resp, err := hvm.Create(ctx, req)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, &cstructs.ClientHostVolumeCreateResponse{
|
||||
VolumeName: "vol-name",
|
||||
VolumeID: "vol-id",
|
||||
HostPath: tmp,
|
||||
CapacityBytes: 5,
|
||||
}, resp)
|
||||
stateDBs, err := memDB.GetDynamicHostVolumes()
|
||||
must.NoError(t, err)
|
||||
// should be saved to state
|
||||
must.Len(t, 1, stateDBs)
|
||||
must.Eq(t, "vol-id", stateDBs[0].ID)
|
||||
must.Eq(t, "vol-id", stateDBs[0].CreateReq.ID)
|
||||
// should be registered with node
|
||||
must.MapContainsKey(t, node.vols, "vol-name", must.Sprintf("no vol-name in %+v", node.vols))
|
||||
})
|
||||
|
||||
t.Run("delete", func(t *testing.T) {
|
||||
// plugin doesn't exist
|
||||
req := &cstructs.ClientHostVolumeDeleteRequest{
|
||||
ID: "vol-id",
|
||||
Name: "vol-name",
|
||||
PluginID: "nope",
|
||||
}
|
||||
_, err := hvm.Delete(ctx, req)
|
||||
must.ErrorIs(t, err, ErrPluginNotExists)
|
||||
|
||||
// error from plugin
|
||||
req.PluginID = "test-plugin"
|
||||
plug.deleteErr = errors.New("sad delete")
|
||||
_, err = hvm.Delete(ctx, req)
|
||||
must.ErrorIs(t, err, plug.deleteErr)
|
||||
plug.reset()
|
||||
|
||||
// error saving state
|
||||
hvm.stateMgr = errDB
|
||||
_, err = hvm.Delete(ctx, req)
|
||||
must.ErrorIs(t, err, cstate.ErrDBError)
|
||||
|
||||
// happy path
|
||||
// add stuff that should be deleted
|
||||
hvm.stateMgr = memDB
|
||||
must.NoError(t, memDB.PutDynamicHostVolume(&cstructs.HostVolumeState{
|
||||
ID: "vol-id",
|
||||
CreateReq: &cstructs.ClientHostVolumeCreateRequest{},
|
||||
}))
|
||||
node.vols = VolumeMap{
|
||||
"vol-name": &structs.ClientHostVolumeConfig{},
|
||||
}
|
||||
// and delete it
|
||||
resp, err := hvm.Delete(ctx, req)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, &cstructs.ClientHostVolumeDeleteResponse{
|
||||
VolumeName: "vol-name",
|
||||
VolumeID: "vol-id",
|
||||
}, resp)
|
||||
must.Eq(t, VolumeMap{}, node.vols, must.Sprint("vols should be deleted from node"))
|
||||
stateVols, err := memDB.GetDynamicHostVolumes()
|
||||
must.NoError(t, err)
|
||||
must.Nil(t, stateVols, must.Sprint("vols should be deleted from state"))
|
||||
})
|
||||
}
|
||||
|
||||
type fakePlugin struct {
|
||||
mountDir string
|
||||
created string
|
||||
deleted string
|
||||
fingerprintErr error
|
||||
createErr error
|
||||
deleteErr error
|
||||
}
|
||||
|
||||
func (p *fakePlugin) reset() {
|
||||
p.deleted, p.fingerprintErr, p.createErr, p.deleteErr = "", nil, nil, nil
|
||||
}
|
||||
|
||||
func (p *fakePlugin) Fingerprint(_ context.Context) (*PluginFingerprint, error) {
|
||||
if p.fingerprintErr != nil {
|
||||
return nil, p.fingerprintErr
|
||||
}
|
||||
v, err := version.NewVersion("0.0.1")
|
||||
return &PluginFingerprint{
|
||||
Version: v,
|
||||
}, err
|
||||
}
|
||||
|
||||
func (p *fakePlugin) Create(_ context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) {
|
||||
if p.createErr != nil {
|
||||
return nil, p.createErr
|
||||
}
|
||||
p.created = req.ID
|
||||
return &HostVolumePluginCreateResponse{
|
||||
Path: p.mountDir,
|
||||
SizeBytes: req.RequestedCapacityMinBytes,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (p *fakePlugin) Delete(_ context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error {
|
||||
if p.deleteErr != nil {
|
||||
return p.deleteErr
|
||||
}
|
||||
p.deleted = req.ID
|
||||
return nil
|
||||
}
|
||||
|
||||
func TestHostVolumeManager_restoreFromState(t *testing.T) {
|
||||
log := testlog.HCLogger(t)
|
||||
vol := &cstructs.HostVolumeState{
|
||||
ID: "test-vol-id",
|
||||
@@ -28,7 +191,18 @@ func TestNewHostVolumeManager_restoreState(t *testing.T) {
|
||||
PluginID: "mkdir",
|
||||
},
|
||||
}
|
||||
fNode := newFakeNode()
|
||||
node := newFakeNode()
|
||||
|
||||
t.Run("no vols", func(t *testing.T) {
|
||||
state := cstate.NewMemDB(log)
|
||||
hvm := NewHostVolumeManager(log, Config{
|
||||
StateMgr: state,
|
||||
// no other fields are necessary when there are zero volumes
|
||||
})
|
||||
vols, err := hvm.restoreFromState(timeout(t))
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, VolumeMap{}, vols)
|
||||
})
|
||||
|
||||
t.Run("happy", func(t *testing.T) {
|
||||
// put our volume in state
|
||||
@@ -42,14 +216,12 @@ func TestNewHostVolumeManager_restoreState(t *testing.T) {
|
||||
|
||||
hvm := NewHostVolumeManager(log, Config{
|
||||
StateMgr: state,
|
||||
UpdateNodeVols: fNode.updateVol,
|
||||
UpdateNodeVols: node.updateVol,
|
||||
PluginDir: "/wherever",
|
||||
SharedMountDir: mountDir,
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
vols, err := hvm.restoreFromState(ctx)
|
||||
vols, err := hvm.restoreFromState(timeout(t))
|
||||
must.NoError(t, err)
|
||||
|
||||
expect := map[string]*structs.ClientHostVolumeConfig{
|
||||
@@ -65,23 +237,55 @@ func TestNewHostVolumeManager_restoreState(t *testing.T) {
|
||||
must.DirExists(t, volPath)
|
||||
})
|
||||
|
||||
t.Run("get error", func(t *testing.T) {
|
||||
t.Run("state error", func(t *testing.T) {
|
||||
state := &cstate.ErrDB{}
|
||||
hvm := NewHostVolumeManager(log, Config{
|
||||
StateMgr: state,
|
||||
UpdateNodeVols: fNode.updateVol,
|
||||
PluginDir: "/wherever",
|
||||
SharedMountDir: "/wherever",
|
||||
})
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
vols, err := hvm.restoreFromState(ctx)
|
||||
// error loading state should break the world
|
||||
hvm := NewHostVolumeManager(log, Config{StateMgr: state})
|
||||
vols, err := hvm.restoreFromState(timeout(t))
|
||||
must.ErrorIs(t, err, cstate.ErrDBError)
|
||||
must.Nil(t, vols)
|
||||
})
|
||||
|
||||
// db TODO: test plugin error
|
||||
t.Run("plugin missing", func(t *testing.T) {
|
||||
state := cstate.NewMemDB(log)
|
||||
vol := &cstructs.HostVolumeState{
|
||||
CreateReq: &cstructs.ClientHostVolumeCreateRequest{
|
||||
PluginID: "nonexistent-plugin",
|
||||
},
|
||||
}
|
||||
must.NoError(t, state.PutDynamicHostVolume(vol))
|
||||
|
||||
hvm := NewHostVolumeManager(log, Config{StateMgr: state})
|
||||
vols, err := hvm.restoreFromState(timeout(t))
|
||||
must.ErrorIs(t, err, ErrPluginNotExists)
|
||||
must.MapEmpty(t, vols)
|
||||
})
|
||||
|
||||
t.Run("plugin error", func(t *testing.T) {
|
||||
state := cstate.NewMemDB(log)
|
||||
vol := &cstructs.HostVolumeState{
|
||||
ID: "test-volume",
|
||||
CreateReq: &cstructs.ClientHostVolumeCreateRequest{
|
||||
PluginID: "test-plugin",
|
||||
},
|
||||
}
|
||||
must.NoError(t, state.PutDynamicHostVolume(vol))
|
||||
|
||||
log, getLogs := logRecorder(t)
|
||||
hvm := NewHostVolumeManager(log, Config{StateMgr: state})
|
||||
plug := &fakePlugin{
|
||||
createErr: errors.New("sad create"),
|
||||
}
|
||||
hvm.builtIns["test-plugin"] = plug
|
||||
|
||||
vols, err := hvm.restoreFromState(timeout(t))
|
||||
// error during restore should not halt the whole client
|
||||
must.NoError(t, err)
|
||||
must.NotNil(t, vols)
|
||||
// but it should log
|
||||
logs := getLogs()
|
||||
must.StrContains(t, logs, "[ERROR]")
|
||||
must.StrContains(t, logs, `failed to restore: plugin_id=test-plugin volume_id=test-volume error="sad create"`)
|
||||
})
|
||||
}
|
||||
|
||||
type fakeNode struct {
|
||||
@@ -97,3 +301,29 @@ func newFakeNode() *fakeNode {
|
||||
vols: make(VolumeMap),
|
||||
}
|
||||
}
|
||||
|
||||
// timeout provides a context that times out in 1 second
|
||||
func timeout(t *testing.T) context.Context {
|
||||
t.Helper()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
t.Cleanup(cancel)
|
||||
return ctx
|
||||
}
|
||||
|
||||
// logRecorder is here so we can assert that stdout/stderr appear in logs
|
||||
func logRecorder(t *testing.T) (hclog.Logger, func() string) {
|
||||
t.Helper()
|
||||
buf := &bytes.Buffer{}
|
||||
logger := hclog.New(&hclog.LoggerOptions{
|
||||
Name: "log-recorder",
|
||||
Output: buf,
|
||||
Level: hclog.Debug,
|
||||
IncludeLocation: true,
|
||||
DisableTime: true,
|
||||
})
|
||||
return logger, func() string {
|
||||
bts, err := io.ReadAll(buf)
|
||||
test.NoError(t, err)
|
||||
return string(bts)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -41,6 +41,7 @@ func UpdateVolumeMap(volumes VolumeMap, name string, vol *structs.ClientHostVolu
|
||||
}
|
||||
|
||||
// WaitForFirstFingerprint implements client.FingerprintingPluginManager
|
||||
// so any existing volumes are added to the client node on agent start.
|
||||
func (hvm *HostVolumeManager) WaitForFirstFingerprint(ctx context.Context) <-chan struct{} {
|
||||
// the fingerprint manager puts batchFirstFingerprintsTimeout (50 seconds)
|
||||
// on the context that it sends to us here so we don't need another
|
||||
|
||||
@@ -6,6 +6,9 @@ package hostvolumemanager
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
@@ -79,3 +82,46 @@ func TestUpdateVolumeMap(t *testing.T) {
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitForFirstFingerprint(t *testing.T) {
|
||||
log := testlog.HCLogger(t)
|
||||
tmp := t.TempDir()
|
||||
memDB := state.NewMemDB(log)
|
||||
node := newFakeNode()
|
||||
hvm := NewHostVolumeManager(log, Config{
|
||||
PluginDir: "",
|
||||
SharedMountDir: tmp,
|
||||
StateMgr: memDB,
|
||||
UpdateNodeVols: node.updateVol,
|
||||
})
|
||||
plug := &fakePlugin{mountDir: tmp}
|
||||
hvm.builtIns = map[string]HostVolumePlugin{
|
||||
"test-plugin": plug,
|
||||
}
|
||||
must.NoError(t, memDB.PutDynamicHostVolume(&cstructs.HostVolumeState{
|
||||
ID: "vol-id",
|
||||
CreateReq: &cstructs.ClientHostVolumeCreateRequest{
|
||||
ID: "vol-id",
|
||||
Name: "vol-name",
|
||||
PluginID: "test-plugin",
|
||||
},
|
||||
}))
|
||||
|
||||
ctx := timeout(t)
|
||||
done := hvm.WaitForFirstFingerprint(ctx)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("fingerprint timed out")
|
||||
case <-done:
|
||||
}
|
||||
|
||||
must.Eq(t, "vol-id", plug.created)
|
||||
must.Eq(t, VolumeMap{
|
||||
"vol-name": &structs.ClientHostVolumeConfig{
|
||||
Name: "vol-name",
|
||||
ID: "vol-id",
|
||||
Path: tmp,
|
||||
ReadOnly: false,
|
||||
},
|
||||
}, node.vols)
|
||||
}
|
||||
|
||||
@@ -230,7 +230,6 @@ type ClientConfig struct {
|
||||
AllocMountsDir string `hcl:"alloc_mounts_dir"`
|
||||
|
||||
// HostVolumePluginDir directory contains dynamic host volume plugins
|
||||
// db TODO(1.10.0): document default directory is alongside alloc_mounts
|
||||
HostVolumePluginDir string `hcl:"host_volume_plugin_dir"`
|
||||
|
||||
// Servers is a list of known server addresses. These are as "host:port"
|
||||
|
||||
@@ -56,7 +56,6 @@ case $op in
|
||||
export CAPACITY_MAX_BYTES=50000000 # 50mb
|
||||
export CAPACITY_MIN_BYTES=50000000 # 50mb
|
||||
export PARAMETERS='{"a": "ayy"}'
|
||||
# db TODO(1.10.0): check stdout
|
||||
;;
|
||||
|
||||
delete)
|
||||
|
||||
@@ -2,8 +2,6 @@
|
||||
# Copyright (c) HashiCorp, Inc.
|
||||
# SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
# db TODO(1.10.0): where does PATH come from here? somewhere implicit? /sbin/ and /bin/ and ...?
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
version='0.0.1'
|
||||
|
||||
Reference in New Issue
Block a user