diff --git a/client/hostvolumemanager/host_volume_plugin.go b/client/hostvolumemanager/host_volume_plugin.go index 961466b12..ce113a4e8 100644 --- a/client/hostvolumemanager/host_volume_plugin.go +++ b/client/hostvolumemanager/host_volume_plugin.go @@ -20,17 +20,23 @@ import ( "github.com/hashicorp/nomad/helper" ) -type PluginFingerprint struct { - Version *version.Version `json:"version"` -} - +// HostVolumePlugin manages the lifecycle of volumes. type HostVolumePlugin interface { Fingerprint(ctx context.Context) (*PluginFingerprint, error) Create(ctx context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) Delete(ctx context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error - // db TODO(1.10.0): update? resize? ?? } +// PluginFingerprint gets set on the node for volume scheduling. +// Plugins are expected to respond to 'fingerprint' calls with json that +// unmarshals to this struct. +type PluginFingerprint struct { + Version *version.Version `json:"version"` +} + +// HostVolumePluginCreateResponse gets stored on the volume in server state. +// Plugins are expected to respond to 'create' calls with json that +// unmarshals to this struct. type HostVolumePluginCreateResponse struct { Path string `json:"path"` SizeBytes int64 `json:"bytes"` @@ -41,6 +47,8 @@ const HostVolumePluginMkdirVersion = "0.0.1" var _ HostVolumePlugin = &HostVolumePluginMkdir{} +// HostVolumePluginMkdir is a plugin that creates a directory within the +// specified TargetPath. It is built-in to Nomad, so is always available. type HostVolumePluginMkdir struct { ID string TargetPath string @@ -66,7 +74,8 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context, log.Debug("running plugin") resp := &HostVolumePluginCreateResponse{ - Path: path, + Path: path, + // "mkdir" volumes, being simple directories, have unrestricted size SizeBytes: 0, } @@ -109,6 +118,8 @@ func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHo var _ HostVolumePlugin = &HostVolumePluginExternal{} +// NewHostVolumePluginExternal returns an external host volume plugin +// if the specified executable exists on disk. func NewHostVolumePluginExternal(log hclog.Logger, id, executable, targetPath string) (*HostVolumePluginExternal, error) { // this should only be called with already-detected executables, @@ -132,6 +143,10 @@ func NewHostVolumePluginExternal(log hclog.Logger, }, nil } +// HostVolumePluginExternal calls an executable on disk. All operations +// *must* be idempotent, and safe to be called concurrently per volume. +// For each call, the executable's stdout and stderr may be logged, so plugin +// authors should not include any sensitive information in their plugin outputs. type HostVolumePluginExternal struct { ID string Executable string @@ -140,6 +155,15 @@ type HostVolumePluginExternal struct { log hclog.Logger } +// Fingerprint calls the executable with the following parameters: +// arguments: fingerprint +// environment: +// OPERATION=fingerprint +// +// Response should be valid JSON on stdout, with a "version" key, e.g.: +// {"version": "0.0.1"} +// The version value should be a valid version number as allowed by +// version.NewVersion() func (p *HostVolumePluginExternal) Fingerprint(ctx context.Context) (*PluginFingerprint, error) { cmd := exec.CommandContext(ctx, p.Executable, "fingerprint") cmd.Env = []string{"OPERATION=fingerprint"} @@ -159,12 +183,28 @@ func (p *HostVolumePluginExternal) Fingerprint(ctx context.Context) (*PluginFing return fprint, nil } +// Create calls the executable with the following parameters: +// arguments: create {path to create} +// environment: +// OPERATION=create +// HOST_PATH={path to create} +// NODE_ID={Nomad node ID} +// VOLUME_NAME={name from the volume specification} +// CAPACITY_MIN_BYTES={capacity_min from the volume spec} +// CAPACITY_MAX_BYTES={capacity_max from the volume spec} +// PARAMETERS={json of parameters from the volume spec} +// +// Response should be valid JSON on stdout with "path" and "bytes", e.g.: +// {"path": $HOST_PATH, "bytes": 50000000} +// "path" must be provided to confirm that the requested path is what was +// created by the plugin. "bytes" is the actual size of the volume created +// by the plugin; if excluded, it will default to 0. func (p *HostVolumePluginExternal) Create(ctx context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) { - params, err := json.Marshal(req.Parameters) // db TODO(1.10.0): document if this is nil, then PARAMETERS env will be "null" + params, err := json.Marshal(req.Parameters) if err != nil { - // this is a proper error, because users can set this in the volume spec + // should never happen; req.Parameters is a simple map[string]string return nil, fmt.Errorf("error marshaling volume pramaters: %w", err) } envVars := []string{ @@ -181,22 +221,37 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context, } var pluginResp HostVolumePluginCreateResponse - err = json.Unmarshal(stdout, &pluginResp) // db TODO(1.10.0): if this fails, then the volume may have been created, according to the plugin, but Nomad will not save it + err = json.Unmarshal(stdout, &pluginResp) if err != nil { + // note: if a plugin does not return valid json, a volume may be + // created without any respective state in Nomad, since we return + // an error here after the plugin has done who-knows-what. return nil, err } return &pluginResp, nil } +// Delete calls the executable with the following parameters: +// arguments: delete {path to create} +// environment: +// OPERATION=delete +// HOST_PATH={path to create} +// NODE_ID={Nomad node ID} +// VOLUME_NAME={name from the volume specification} +// PARAMETERS={json of parameters from the volume spec} +// +// Response on stdout is discarded. func (p *HostVolumePluginExternal) Delete(ctx context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error { params, err := json.Marshal(req.Parameters) if err != nil { + // should never happen; req.Parameters is a simple map[string]string return fmt.Errorf("error marshaling volume pramaters: %w", err) } envVars := []string{ "NODE_ID=" + req.NodeID, + "VOLUME_NAME=" + req.Name, "PARAMETERS=" + string(params), } @@ -207,6 +262,9 @@ func (p *HostVolumePluginExternal) Delete(ctx context.Context, return nil } +// runPlugin executes the... executable with these additional env vars: +// OPERATION={op} +// HOST_PATH={p.TargetPath/volID} func (p *HostVolumePluginExternal) runPlugin(ctx context.Context, op, volID string, env []string) (stdout, stderr []byte, err error) { @@ -239,6 +297,7 @@ func (p *HostVolumePluginExternal) runPlugin(ctx context.Context, return stdout, stderr, nil } +// runCommand executes the provided Cmd and captures stdout and stderr. func runCommand(cmd *exec.Cmd) (stdout, stderr []byte, err error) { var errBuf bytes.Buffer cmd.Stderr = io.Writer(&errBuf) diff --git a/client/hostvolumemanager/host_volume_plugin_test.go b/client/hostvolumemanager/host_volume_plugin_test.go index 0552810bb..de008bbd8 100644 --- a/client/hostvolumemanager/host_volume_plugin_test.go +++ b/client/hostvolumemanager/host_volume_plugin_test.go @@ -4,19 +4,13 @@ package hostvolumemanager import ( - "bytes" - "context" - "io" "path/filepath" "runtime" "testing" - "time" - "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-version" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/testlog" - "github.com/shoenig/test" "github.com/shoenig/test/must" ) @@ -37,23 +31,29 @@ func TestHostVolumePluginMkdir(t *testing.T) { must.NoError(t, err) t.Run("happy", func(t *testing.T) { - resp, err := plug.Create(timeout(t), - &cstructs.ClientHostVolumeCreateRequest{ - ID: volID, // minimum required by this plugin - }) - must.NoError(t, err) - must.Eq(t, &HostVolumePluginCreateResponse{ - Path: target, - SizeBytes: 0, - }, resp) - must.DirExists(t, target) + // run multiple times, should be idempotent + for range 2 { + resp, err := plug.Create(timeout(t), + &cstructs.ClientHostVolumeCreateRequest{ + ID: volID, // minimum required by this plugin + }) + must.NoError(t, err) + must.Eq(t, &HostVolumePluginCreateResponse{ + Path: target, + SizeBytes: 0, + }, resp) + must.DirExists(t, target) + } - err = plug.Delete(timeout(t), - &cstructs.ClientHostVolumeDeleteRequest{ - ID: volID, - }) - must.NoError(t, err) - must.DirNotExists(t, target) + // delete should be idempotent, too + for range 2 { + err = plug.Delete(timeout(t), + &cstructs.ClientHostVolumeDeleteRequest{ + ID: volID, + }) + must.NoError(t, err) + must.DirNotExists(t, target) + } }) t.Run("sad", func(t *testing.T) { @@ -75,6 +75,31 @@ func TestHostVolumePluginMkdir(t *testing.T) { }) } +func TestNewHostVolumePluginExternal(t *testing.T) { + log := testlog.HCLogger(t) + var err error + + _, err = NewHostVolumePluginExternal(log, "test-id", "non-existent", "target") + must.ErrorIs(t, err, ErrPluginNotExists) + + _, err = NewHostVolumePluginExternal(log, "test-id", "host_volume_plugin_test.go", "target") + must.ErrorIs(t, err, ErrPluginNotExecutable) + + t.Run("unix", func(t *testing.T) { + if runtime.GOOS == "windows" { + t.Skip("skipped because windows") // db TODO(1.10.0) + } + p, err := NewHostVolumePluginExternal(log, "test-id", "./test_fixtures/test_plugin.sh", "test-target") + must.NoError(t, err) + must.Eq(t, &HostVolumePluginExternal{ + ID: "test-id", + Executable: "./test_fixtures/test_plugin.sh", + TargetPath: "test-target", + log: log, + }, p) + }) +} + func TestHostVolumePluginExternal(t *testing.T) { if runtime.GOOS == "windows" { t.Skip("skipped because windows") // db TODO(1.10.0) @@ -187,29 +212,3 @@ func TestHostVolumePluginExternal(t *testing.T) { must.StrContains(t, logged, "delete: it tells you all about it in stderr") }) } - -// timeout provides a context that times out in 1 second -func timeout(t *testing.T) context.Context { - t.Helper() - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - t.Cleanup(cancel) - return ctx -} - -// logRecorder is here so we can assert that stdout/stderr appear in logs -func logRecorder(t *testing.T) (hclog.Logger, func() string) { - t.Helper() - buf := &bytes.Buffer{} - logger := hclog.New(&hclog.LoggerOptions{ - Name: "log-recorder", - Output: buf, - Level: hclog.Debug, - IncludeLocation: true, - DisableTime: true, - }) - return logger, func() string { - bts, err := io.ReadAll(buf) - test.NoError(t, err) - return string(bts) - } -} diff --git a/client/hostvolumemanager/host_volumes.go b/client/hostvolumemanager/host_volumes.go index 82ccb8f47..aea84ee89 100644 --- a/client/hostvolumemanager/host_volumes.go +++ b/client/hostvolumemanager/host_volumes.go @@ -21,12 +21,14 @@ var ( ErrPluginNotExecutable = errors.New("plugin not executable") ) +// HostVolumeStateManager manages the lifecycle of volumes in client state. type HostVolumeStateManager interface { PutDynamicHostVolume(*cstructs.HostVolumeState) error GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) DeleteDynamicHostVolume(string) error } +// Config is used to configure a HostVolumeManager. type Config struct { // PluginDir is where external plugins may be found. PluginDir string @@ -43,97 +45,38 @@ type Config struct { UpdateNodeVols HostVolumeNodeUpdater } +// HostVolumeManager executes plugins, manages volume metadata in client state, +// and registers volumes with the client node. type HostVolumeManager struct { pluginDir string sharedMountDir string stateMgr HostVolumeStateManager updateNodeVols HostVolumeNodeUpdater + builtIns map[string]HostVolumePlugin log hclog.Logger } +// NewHostVolumeManager includes default builtin plugins. func NewHostVolumeManager(logger hclog.Logger, config Config) *HostVolumeManager { - // db TODO(1.10.0): document plugin config options + logger = logger.Named("host_volume_manager") return &HostVolumeManager{ pluginDir: config.PluginDir, sharedMountDir: config.SharedMountDir, stateMgr: config.StateMgr, updateNodeVols: config.UpdateNodeVols, - log: logger.Named("host_volume_manager"), + builtIns: map[string]HostVolumePlugin{ + HostVolumePluginMkdirID: &HostVolumePluginMkdir{ + ID: HostVolumePluginMkdirID, + TargetPath: config.SharedMountDir, + log: logger.With("plugin_id", HostVolumePluginMkdirID), + }, + }, + log: logger, } } -func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, resp *HostVolumePluginCreateResponse) *structs.ClientHostVolumeConfig { - if req == nil || resp == nil { - return nil - } - return &structs.ClientHostVolumeConfig{ - Name: req.Name, - ID: req.ID, - Path: resp.Path, - - // dynamic volumes, like CSI, have more robust `capabilities`, - // so we always set ReadOnly to false, and let the scheduler - // decide when to ignore this and check capabilities instead. - ReadOnly: false, - } -} - -func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap, error) { - vols, err := hvm.stateMgr.GetDynamicHostVolumes() - if err != nil { - return nil, err - } - - volumes := make(VolumeMap) - var mut sync.Mutex - - if len(vols) == 0 { - return volumes, nil // nothing to do - } - - // re-"create" the volumes - plugins have the best knowledge of their - // side effects, and they must be idempotent. - group := multierror.Group{} - for _, vol := range vols { - group.Go(func() error { // db TODO(1.10.0): document that plugins must be safe to run concurrently - // missing plugins with associated volumes in state are considered - // client-stopping errors. they need to be fixed by cluster admins. - plug, err := hvm.getPlugin(vol.CreateReq.PluginID) - if err != nil { - return err - } - - resp, err := plug.Create(ctx, vol.CreateReq) - if err != nil { - // plugin execution errors are only logged - hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err) - return nil - } - mut.Lock() - volumes[vol.CreateReq.Name] = genVolConfig(vol.CreateReq, resp) - mut.Unlock() - return nil - }) - } - mErr := group.Wait() - return volumes, helper.FlattenMultierror(mErr.ErrorOrNil()) -} - -func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) { - log := hvm.log.With("plugin_id", id) - - if id == HostVolumePluginMkdirID { - return &HostVolumePluginMkdir{ - ID: HostVolumePluginMkdirID, - TargetPath: hvm.sharedMountDir, - log: log, - }, nil - } - - path := filepath.Join(hvm.pluginDir, id) - return NewHostVolumePluginExternal(log, id, path, hvm.sharedMountDir) -} - +// Create runs the appropriate plugin for the given request, saves the request +// to state, and updates the node with the volume. func (hvm *HostVolumeManager) Create(ctx context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*cstructs.ClientHostVolumeCreateResponse, error) { @@ -181,6 +124,8 @@ func (hvm *HostVolumeManager) Create(ctx context.Context, return resp, nil } +// Delete runs the appropriate plugin for the given request, removes it from +// state, and updates the node to remove the volume. func (hvm *HostVolumeManager) Delete(ctx context.Context, req *cstructs.ClientHostVolumeDeleteRequest) (*cstructs.ClientHostVolumeDeleteResponse, error) { @@ -208,3 +153,71 @@ func (hvm *HostVolumeManager) Delete(ctx context.Context, return resp, nil } + +// getPlugin finds either a built-in plugin or an external plugin. +func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) { + if plug, ok := hvm.builtIns[id]; ok { + return plug, nil + } + log := hvm.log.With("plugin_id", id) + path := filepath.Join(hvm.pluginDir, id) + return NewHostVolumePluginExternal(log, id, path, hvm.sharedMountDir) +} + +// restoreFromState loads all volumes from client state and runs Create for +// each one, so volumes are restored upon agent restart or host reboot. +func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap, error) { + vols, err := hvm.stateMgr.GetDynamicHostVolumes() + if err != nil { + return nil, err + } + + volumes := make(VolumeMap) + var mut sync.Mutex + + if len(vols) == 0 { + return volumes, nil // nothing to do + } + + // re-"create" the volumes - plugins have the best knowledge of their + // side effects, and they must be idempotent. + group := multierror.Group{} + for _, vol := range vols { + group.Go(func() error { + // missing plugins with associated volumes in state are considered + // client-stopping errors. they need to be fixed by cluster admins. + plug, err := hvm.getPlugin(vol.CreateReq.PluginID) + if err != nil { + return err + } + + resp, err := plug.Create(ctx, vol.CreateReq) + if err != nil { + // plugin execution errors are only logged + hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err) + return nil + } + mut.Lock() + volumes[vol.CreateReq.Name] = genVolConfig(vol.CreateReq, resp) + mut.Unlock() + return nil + }) + } + mErr := group.Wait() + return volumes, helper.FlattenMultierror(mErr.ErrorOrNil()) +} + +// genVolConfig generates the host volume config for the node to report as +// available to the servers for job scheduling. +func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, resp *HostVolumePluginCreateResponse) *structs.ClientHostVolumeConfig { + return &structs.ClientHostVolumeConfig{ + Name: req.Name, + ID: req.ID, + Path: resp.Path, + + // dynamic volumes, like CSI, have more robust `capabilities`, + // so we always set ReadOnly to false, and let the scheduler + // decide when to ignore this and check capabilities instead. + ReadOnly: false, + } +} diff --git a/client/hostvolumemanager/host_volumes_test.go b/client/hostvolumemanager/host_volumes_test.go index 15a3a2fca..1b76be152 100644 --- a/client/hostvolumemanager/host_volumes_test.go +++ b/client/hostvolumemanager/host_volumes_test.go @@ -4,21 +4,184 @@ package hostvolumemanager import ( + "bytes" "context" + "errors" + "io" "path/filepath" "testing" "time" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-version" cstate "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" + "github.com/shoenig/test" "github.com/shoenig/test/must" ) -// db TODO(1.10.0): improve hostvolumemanager tests. +func TestHostVolumeManager(t *testing.T) { + log := testlog.HCLogger(t) + tmp := t.TempDir() + errDB := &cstate.ErrDB{} + memDB := cstate.NewMemDB(log) + node := newFakeNode() -func TestNewHostVolumeManager_restoreState(t *testing.T) { + hvm := NewHostVolumeManager(log, Config{ + PluginDir: "./test_fixtures", + SharedMountDir: tmp, + StateMgr: errDB, + UpdateNodeVols: node.updateVol, + }) + + plug := &fakePlugin{mountDir: tmp} + hvm.builtIns["test-plugin"] = plug + + ctx := timeout(t) + + t.Run("create", func(t *testing.T) { + // plugin doesn't exist + req := &cstructs.ClientHostVolumeCreateRequest{ + ID: "vol-id", + Name: "vol-name", + PluginID: "nope", + + RequestedCapacityMinBytes: 5, + } + _, err := hvm.Create(ctx, req) + must.ErrorIs(t, err, ErrPluginNotExists) + + // error from plugin + req.PluginID = "test-plugin" + plug.createErr = errors.New("sad create") + _, err = hvm.Create(ctx, req) + must.ErrorIs(t, err, plug.createErr) + plug.reset() + + // error saving state, then error from cleanup attempt + plug.deleteErr = errors.New("sad delete") + _, err = hvm.Create(ctx, req) + must.ErrorIs(t, err, cstate.ErrDBError) + must.ErrorIs(t, err, plug.deleteErr) + plug.reset() + + // error saving state, successful cleanup + _, err = hvm.Create(ctx, req) + must.ErrorIs(t, err, cstate.ErrDBError) + must.Eq(t, "vol-id", plug.deleted) + plug.reset() + + // happy path + hvm.stateMgr = memDB + resp, err := hvm.Create(ctx, req) + must.NoError(t, err) + must.Eq(t, &cstructs.ClientHostVolumeCreateResponse{ + VolumeName: "vol-name", + VolumeID: "vol-id", + HostPath: tmp, + CapacityBytes: 5, + }, resp) + stateDBs, err := memDB.GetDynamicHostVolumes() + must.NoError(t, err) + // should be saved to state + must.Len(t, 1, stateDBs) + must.Eq(t, "vol-id", stateDBs[0].ID) + must.Eq(t, "vol-id", stateDBs[0].CreateReq.ID) + // should be registered with node + must.MapContainsKey(t, node.vols, "vol-name", must.Sprintf("no vol-name in %+v", node.vols)) + }) + + t.Run("delete", func(t *testing.T) { + // plugin doesn't exist + req := &cstructs.ClientHostVolumeDeleteRequest{ + ID: "vol-id", + Name: "vol-name", + PluginID: "nope", + } + _, err := hvm.Delete(ctx, req) + must.ErrorIs(t, err, ErrPluginNotExists) + + // error from plugin + req.PluginID = "test-plugin" + plug.deleteErr = errors.New("sad delete") + _, err = hvm.Delete(ctx, req) + must.ErrorIs(t, err, plug.deleteErr) + plug.reset() + + // error saving state + hvm.stateMgr = errDB + _, err = hvm.Delete(ctx, req) + must.ErrorIs(t, err, cstate.ErrDBError) + + // happy path + // add stuff that should be deleted + hvm.stateMgr = memDB + must.NoError(t, memDB.PutDynamicHostVolume(&cstructs.HostVolumeState{ + ID: "vol-id", + CreateReq: &cstructs.ClientHostVolumeCreateRequest{}, + })) + node.vols = VolumeMap{ + "vol-name": &structs.ClientHostVolumeConfig{}, + } + // and delete it + resp, err := hvm.Delete(ctx, req) + must.NoError(t, err) + must.Eq(t, &cstructs.ClientHostVolumeDeleteResponse{ + VolumeName: "vol-name", + VolumeID: "vol-id", + }, resp) + must.Eq(t, VolumeMap{}, node.vols, must.Sprint("vols should be deleted from node")) + stateVols, err := memDB.GetDynamicHostVolumes() + must.NoError(t, err) + must.Nil(t, stateVols, must.Sprint("vols should be deleted from state")) + }) +} + +type fakePlugin struct { + mountDir string + created string + deleted string + fingerprintErr error + createErr error + deleteErr error +} + +func (p *fakePlugin) reset() { + p.deleted, p.fingerprintErr, p.createErr, p.deleteErr = "", nil, nil, nil +} + +func (p *fakePlugin) Fingerprint(_ context.Context) (*PluginFingerprint, error) { + if p.fingerprintErr != nil { + return nil, p.fingerprintErr + } + v, err := version.NewVersion("0.0.1") + return &PluginFingerprint{ + Version: v, + }, err +} + +func (p *fakePlugin) Create(_ context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) { + if p.createErr != nil { + return nil, p.createErr + } + p.created = req.ID + return &HostVolumePluginCreateResponse{ + Path: p.mountDir, + SizeBytes: req.RequestedCapacityMinBytes, + }, nil +} + +func (p *fakePlugin) Delete(_ context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error { + if p.deleteErr != nil { + return p.deleteErr + } + p.deleted = req.ID + return nil +} + +func TestHostVolumeManager_restoreFromState(t *testing.T) { log := testlog.HCLogger(t) vol := &cstructs.HostVolumeState{ ID: "test-vol-id", @@ -28,7 +191,18 @@ func TestNewHostVolumeManager_restoreState(t *testing.T) { PluginID: "mkdir", }, } - fNode := newFakeNode() + node := newFakeNode() + + t.Run("no vols", func(t *testing.T) { + state := cstate.NewMemDB(log) + hvm := NewHostVolumeManager(log, Config{ + StateMgr: state, + // no other fields are necessary when there are zero volumes + }) + vols, err := hvm.restoreFromState(timeout(t)) + must.NoError(t, err) + must.Eq(t, VolumeMap{}, vols) + }) t.Run("happy", func(t *testing.T) { // put our volume in state @@ -42,14 +216,12 @@ func TestNewHostVolumeManager_restoreState(t *testing.T) { hvm := NewHostVolumeManager(log, Config{ StateMgr: state, - UpdateNodeVols: fNode.updateVol, + UpdateNodeVols: node.updateVol, PluginDir: "/wherever", SharedMountDir: mountDir, }) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - vols, err := hvm.restoreFromState(ctx) + vols, err := hvm.restoreFromState(timeout(t)) must.NoError(t, err) expect := map[string]*structs.ClientHostVolumeConfig{ @@ -65,23 +237,55 @@ func TestNewHostVolumeManager_restoreState(t *testing.T) { must.DirExists(t, volPath) }) - t.Run("get error", func(t *testing.T) { + t.Run("state error", func(t *testing.T) { state := &cstate.ErrDB{} - hvm := NewHostVolumeManager(log, Config{ - StateMgr: state, - UpdateNodeVols: fNode.updateVol, - PluginDir: "/wherever", - SharedMountDir: "/wherever", - }) - ctx, cancel := context.WithTimeout(context.Background(), time.Second) - defer cancel() - vols, err := hvm.restoreFromState(ctx) - // error loading state should break the world + hvm := NewHostVolumeManager(log, Config{StateMgr: state}) + vols, err := hvm.restoreFromState(timeout(t)) must.ErrorIs(t, err, cstate.ErrDBError) must.Nil(t, vols) }) - // db TODO: test plugin error + t.Run("plugin missing", func(t *testing.T) { + state := cstate.NewMemDB(log) + vol := &cstructs.HostVolumeState{ + CreateReq: &cstructs.ClientHostVolumeCreateRequest{ + PluginID: "nonexistent-plugin", + }, + } + must.NoError(t, state.PutDynamicHostVolume(vol)) + + hvm := NewHostVolumeManager(log, Config{StateMgr: state}) + vols, err := hvm.restoreFromState(timeout(t)) + must.ErrorIs(t, err, ErrPluginNotExists) + must.MapEmpty(t, vols) + }) + + t.Run("plugin error", func(t *testing.T) { + state := cstate.NewMemDB(log) + vol := &cstructs.HostVolumeState{ + ID: "test-volume", + CreateReq: &cstructs.ClientHostVolumeCreateRequest{ + PluginID: "test-plugin", + }, + } + must.NoError(t, state.PutDynamicHostVolume(vol)) + + log, getLogs := logRecorder(t) + hvm := NewHostVolumeManager(log, Config{StateMgr: state}) + plug := &fakePlugin{ + createErr: errors.New("sad create"), + } + hvm.builtIns["test-plugin"] = plug + + vols, err := hvm.restoreFromState(timeout(t)) + // error during restore should not halt the whole client + must.NoError(t, err) + must.NotNil(t, vols) + // but it should log + logs := getLogs() + must.StrContains(t, logs, "[ERROR]") + must.StrContains(t, logs, `failed to restore: plugin_id=test-plugin volume_id=test-volume error="sad create"`) + }) } type fakeNode struct { @@ -97,3 +301,29 @@ func newFakeNode() *fakeNode { vols: make(VolumeMap), } } + +// timeout provides a context that times out in 1 second +func timeout(t *testing.T) context.Context { + t.Helper() + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + t.Cleanup(cancel) + return ctx +} + +// logRecorder is here so we can assert that stdout/stderr appear in logs +func logRecorder(t *testing.T) (hclog.Logger, func() string) { + t.Helper() + buf := &bytes.Buffer{} + logger := hclog.New(&hclog.LoggerOptions{ + Name: "log-recorder", + Output: buf, + Level: hclog.Debug, + IncludeLocation: true, + DisableTime: true, + }) + return logger, func() string { + bts, err := io.ReadAll(buf) + test.NoError(t, err) + return string(bts) + } +} diff --git a/client/hostvolumemanager/volume_fingerprint.go b/client/hostvolumemanager/volume_fingerprint.go index 37b0c84fb..a4e171368 100644 --- a/client/hostvolumemanager/volume_fingerprint.go +++ b/client/hostvolumemanager/volume_fingerprint.go @@ -41,6 +41,7 @@ func UpdateVolumeMap(volumes VolumeMap, name string, vol *structs.ClientHostVolu } // WaitForFirstFingerprint implements client.FingerprintingPluginManager +// so any existing volumes are added to the client node on agent start. func (hvm *HostVolumeManager) WaitForFirstFingerprint(ctx context.Context) <-chan struct{} { // the fingerprint manager puts batchFirstFingerprintsTimeout (50 seconds) // on the context that it sends to us here so we don't need another diff --git a/client/hostvolumemanager/volume_fingerprint_test.go b/client/hostvolumemanager/volume_fingerprint_test.go index c5198eb7c..3cda6a540 100644 --- a/client/hostvolumemanager/volume_fingerprint_test.go +++ b/client/hostvolumemanager/volume_fingerprint_test.go @@ -6,6 +6,9 @@ package hostvolumemanager import ( "testing" + "github.com/hashicorp/nomad/client/state" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/structs" "github.com/shoenig/test/must" ) @@ -79,3 +82,46 @@ func TestUpdateVolumeMap(t *testing.T) { }) } } + +func TestWaitForFirstFingerprint(t *testing.T) { + log := testlog.HCLogger(t) + tmp := t.TempDir() + memDB := state.NewMemDB(log) + node := newFakeNode() + hvm := NewHostVolumeManager(log, Config{ + PluginDir: "", + SharedMountDir: tmp, + StateMgr: memDB, + UpdateNodeVols: node.updateVol, + }) + plug := &fakePlugin{mountDir: tmp} + hvm.builtIns = map[string]HostVolumePlugin{ + "test-plugin": plug, + } + must.NoError(t, memDB.PutDynamicHostVolume(&cstructs.HostVolumeState{ + ID: "vol-id", + CreateReq: &cstructs.ClientHostVolumeCreateRequest{ + ID: "vol-id", + Name: "vol-name", + PluginID: "test-plugin", + }, + })) + + ctx := timeout(t) + done := hvm.WaitForFirstFingerprint(ctx) + select { + case <-ctx.Done(): + t.Fatal("fingerprint timed out") + case <-done: + } + + must.Eq(t, "vol-id", plug.created) + must.Eq(t, VolumeMap{ + "vol-name": &structs.ClientHostVolumeConfig{ + Name: "vol-name", + ID: "vol-id", + Path: tmp, + ReadOnly: false, + }, + }, node.vols) +} diff --git a/command/agent/config.go b/command/agent/config.go index acfb9bc63..28a6afb67 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -230,7 +230,6 @@ type ClientConfig struct { AllocMountsDir string `hcl:"alloc_mounts_dir"` // HostVolumePluginDir directory contains dynamic host volume plugins - // db TODO(1.10.0): document default directory is alongside alloc_mounts HostVolumePluginDir string `hcl:"host_volume_plugin_dir"` // Servers is a list of known server addresses. These are as "host:port" diff --git a/demo/hostvolume/_test-plugin.sh b/demo/hostvolume/_test-plugin.sh index 864680e23..791f46787 100755 --- a/demo/hostvolume/_test-plugin.sh +++ b/demo/hostvolume/_test-plugin.sh @@ -56,7 +56,6 @@ case $op in export CAPACITY_MAX_BYTES=50000000 # 50mb export CAPACITY_MIN_BYTES=50000000 # 50mb export PARAMETERS='{"a": "ayy"}' - # db TODO(1.10.0): check stdout ;; delete) diff --git a/demo/hostvolume/example-plugin-mkfs b/demo/hostvolume/example-plugin-mkfs index 5bfaa4e47..5e92ff870 100755 --- a/demo/hostvolume/example-plugin-mkfs +++ b/demo/hostvolume/example-plugin-mkfs @@ -2,8 +2,6 @@ # Copyright (c) HashiCorp, Inc. # SPDX-License-Identifier: BUSL-1.1 -# db TODO(1.10.0): where does PATH come from here? somewhere implicit? /sbin/ and /bin/ and ...? - set -euo pipefail version='0.0.1'