diff --git a/client/client.go b/client/client.go index 4fc808eca..ac4a7dad7 100644 --- a/client/client.go +++ b/client/client.go @@ -535,9 +535,16 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie c.devicemanager = devManager c.pluginManagers.RegisterAndRun(devManager) - c.hostVolumeManager = hvm.NewHostVolumeManager(logger, + c.hostVolumeManager, err = hvm.NewHostVolumeManager(logger, + c.stateDB, hostVolumeRequestTimeout, cfg.HostVolumePluginDir, cfg.AllocMountsDir) + if err != nil { + // NewHostVolumeManager will only err if it fails to read state store, + // or if one or more required plugins do not exist, so halt the client + // because something needs to be fixed by a cluster admin. + return nil, err + } // Set up the service registration wrapper using the Consul and Nomad // implementations. The Nomad implementation is only ever used on the diff --git a/client/host_volume_endpoint_test.go b/client/host_volume_endpoint_test.go index 037880246..7fbc042d8 100644 --- a/client/host_volume_endpoint_test.go +++ b/client/host_volume_endpoint_test.go @@ -6,9 +6,11 @@ package client import ( "path/filepath" "testing" + "time" "github.com/hashicorp/nomad/ci" hvm "github.com/hashicorp/nomad/client/hostvolumemanager" + "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/testlog" "github.com/shoenig/test/must" @@ -20,10 +22,15 @@ func TestHostVolume(t *testing.T) { client, cleanup := TestClient(t, nil) defer cleanup() + memdb := state.NewMemDB(testlog.HCLogger(t)) + client.stateDB = memdb + tmp := t.TempDir() + var err error expectDir := filepath.Join(tmp, "test-vol-id") - client.hostVolumeManager = hvm.NewHostVolumeManager(testlog.HCLogger(t), - "/no/ext/plugins", tmp) + client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t), + client.stateDB, time.Second, "/no/ext/plugins", tmp) + must.NoError(t, err) t.Run("happy", func(t *testing.T) { req := &cstructs.ClientHostVolumeCreateRequest{ @@ -40,6 +47,15 @@ func TestHostVolume(t *testing.T) { }, resp) // technically this is testing "mkdir" more than the RPC must.DirExists(t, expectDir) + // ensure we saved to client state + vols, err := memdb.GetDynamicHostVolumes() + must.NoError(t, err) + must.Len(t, 1, vols) + expectState := &cstructs.HostVolumeState{ + ID: req.ID, + CreateReq: req, + } + must.Eq(t, expectState, vols[0]) delReq := &cstructs.ClientHostVolumeDeleteRequest{ ID: "test-vol-id", @@ -52,6 +68,10 @@ func TestHostVolume(t *testing.T) { must.NotNil(t, delResp) // again, actually testing the "mkdir" plugin must.DirNotExists(t, expectDir) + // client state should be deleted + vols, err = memdb.GetDynamicHostVolumes() + must.NoError(t, err) + must.Len(t, 0, vols) }) t.Run("missing plugin", func(t *testing.T) { @@ -72,8 +92,9 @@ func TestHostVolume(t *testing.T) { t.Run("error from plugin", func(t *testing.T) { // "mkdir" plugin can't create a directory within a file - client.hostVolumeManager = hvm.NewHostVolumeManager(testlog.HCLogger(t), - "/no/ext/plugins", "host_volume_endpoint_test.go") + client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t), + client.stateDB, time.Second, "/no/ext/plugins", "host_volume_endpoint_test.go") + must.NoError(t, err) req := &cstructs.ClientHostVolumeCreateRequest{ ID: "test-vol-id", diff --git a/client/hostvolumemanager/host_volume_plugin.go b/client/hostvolumemanager/host_volume_plugin.go index 0616ce337..17cdf0279 100644 --- a/client/hostvolumemanager/host_volume_plugin.go +++ b/client/hostvolumemanager/host_volume_plugin.go @@ -29,9 +29,8 @@ type HostVolumePlugin interface { } type HostVolumePluginCreateResponse struct { - Path string `json:"path"` - SizeBytes int64 `json:"bytes"` - Context map[string]string `json:"context"` // metadata + Path string `json:"path"` + SizeBytes int64 `json:"bytes"` } const HostVolumePluginMkdirID = "mkdir" @@ -70,7 +69,6 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context, return &HostVolumePluginCreateResponse{ Path: path, SizeBytes: 0, - Context: map[string]string{}, }, nil } @@ -147,8 +145,9 @@ func (p *HostVolumePluginExternal) Version(ctx context.Context) (*version.Versio func (p *HostVolumePluginExternal) Create(ctx context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) { - params, err := json.Marshal(req.Parameters) // db TODO(1.10.0): if this is nil, then PARAMETERS env will be "null" + params, err := json.Marshal(req.Parameters) // db TODO(1.10.0): document if this is nil, then PARAMETERS env will be "null" if err != nil { + // this is a proper error, because users can set this in the volume spec return nil, fmt.Errorf("error marshaling volume pramaters: %w", err) } envVars := []string{ @@ -165,7 +164,7 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context, } var pluginResp HostVolumePluginCreateResponse - err = json.Unmarshal(stdout, &pluginResp) + err = json.Unmarshal(stdout, &pluginResp) // db TODO(1.10.0): if this fails, then the volume may have been created, according to the plugin, but Nomad will not save it if err != nil { return nil, err } diff --git a/client/hostvolumemanager/host_volume_plugin_test.go b/client/hostvolumemanager/host_volume_plugin_test.go index 954686d74..18de2e1f3 100644 --- a/client/hostvolumemanager/host_volume_plugin_test.go +++ b/client/hostvolumemanager/host_volume_plugin_test.go @@ -45,7 +45,6 @@ func TestHostVolumePluginMkdir(t *testing.T) { must.Eq(t, &HostVolumePluginCreateResponse{ Path: target, SizeBytes: 0, - Context: map[string]string{}, }, resp) must.DirExists(t, target) @@ -115,7 +114,6 @@ func TestHostVolumePluginExternal(t *testing.T) { must.Eq(t, &HostVolumePluginCreateResponse{ Path: target, SizeBytes: 5, - Context: map[string]string{"key": "val"}, }, resp) must.DirExists(t, target) logged := getLogs() diff --git a/client/hostvolumemanager/host_volumes.go b/client/hostvolumemanager/host_volumes.go index 39ab9bb89..9827084a7 100644 --- a/client/hostvolumemanager/host_volumes.go +++ b/client/hostvolumemanager/host_volumes.go @@ -7,9 +7,12 @@ import ( "context" "errors" "path/filepath" + "time" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-multierror" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper" ) var ( @@ -17,22 +20,73 @@ var ( ErrPluginNotExecutable = errors.New("plugin not executable") ) +type HostVolumeStateManager interface { + PutDynamicHostVolume(*cstructs.HostVolumeState) error + GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) + DeleteDynamicHostVolume(string) error +} + type HostVolumeManager struct { pluginDir string sharedMountDir string + stateMgr HostVolumeStateManager log hclog.Logger } -func NewHostVolumeManager(logger hclog.Logger, pluginDir, sharedMountDir string) *HostVolumeManager { +func NewHostVolumeManager(logger hclog.Logger, + state HostVolumeStateManager, restoreTimeout time.Duration, + pluginDir, sharedMountDir string) (*HostVolumeManager, error) { + log := logger.Named("host_volume_mgr") // db TODO(1.10.0): how do we define the external mounter plugins? plugin configs? - return &HostVolumeManager{ - log: log, + hvm := &HostVolumeManager{ pluginDir: pluginDir, sharedMountDir: sharedMountDir, + stateMgr: state, + log: log, } + + if err := hvm.restoreState(state, restoreTimeout); err != nil { + return nil, err + } + + return hvm, nil +} + +func (hvm *HostVolumeManager) restoreState(state HostVolumeStateManager, timeout time.Duration) error { + vols, err := state.GetDynamicHostVolumes() + if err != nil { + return err + } + if len(vols) == 0 { + return nil // nothing to do + } + + // re-"create" the volumes - plugins have the best knowledge of their + // side effects, and they must be idempotent. + group := multierror.Group{} + for _, vol := range vols { + group.Go(func() error { // db TODO(1.10.0): document that plugins must be safe to run concurrently + // missing plugins with associated volumes in state are considered + // client-stopping errors. they need to be fixed by cluster admins. + plug, err := hvm.getPlugin(vol.CreateReq.PluginID) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if _, err := plug.Create(ctx, vol.CreateReq); err != nil { + // plugin execution errors are only logged + hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err) + } + return nil + }) + } + mErr := group.Wait() + return helper.FlattenMultierror(mErr.ErrorOrNil()) } func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) { @@ -63,14 +117,35 @@ func (hvm *HostVolumeManager) Create(ctx context.Context, return nil, err } + volState := &cstructs.HostVolumeState{ + ID: req.ID, + CreateReq: req, + } + if err := hvm.stateMgr.PutDynamicHostVolume(volState); err != nil { + // if we fail to write to state, delete the volume so it isn't left + // lying around without Nomad knowing about it. + hvm.log.Error("failed to save volume in state, so deleting", "volume_id", req.ID, "error", err) + delErr := plug.Delete(ctx, &cstructs.ClientHostVolumeDeleteRequest{ + ID: req.ID, + PluginID: req.PluginID, + NodeID: req.NodeID, + HostPath: hvm.sharedMountDir, + Parameters: req.Parameters, + }) + if delErr != nil { + hvm.log.Warn("error deleting volume after state store failure", "volume_id", req.ID, "error", delErr) + err = multierror.Append(err, delErr) + } + return nil, helper.FlattenMultierror(err) + } + + // db TODO(1.10.0): now we need to add the volume to the node fingerprint! + resp := &cstructs.ClientHostVolumeCreateResponse{ HostPath: pluginResp.Path, CapacityBytes: pluginResp.SizeBytes, } - // db TODO(1.10.0): now we need to add it to the node fingerprint! - // db TODO(1.10.0): and save it in client state! - return resp, nil } @@ -89,7 +164,10 @@ func (hvm *HostVolumeManager) Delete(ctx context.Context, resp := &cstructs.ClientHostVolumeDeleteResponse{} - // db TODO(1.10.0): save the client state! + if err := hvm.stateMgr.DeleteDynamicHostVolume(req.ID); err != nil { + hvm.log.Error("failed to delete volume in state", "volume_id", req.ID, "error", err) + return nil, err // bail so a user may retry + } return resp, nil } diff --git a/client/hostvolumemanager/host_volumes_test.go b/client/hostvolumemanager/host_volumes_test.go new file mode 100644 index 000000000..a94526416 --- /dev/null +++ b/client/hostvolumemanager/host_volumes_test.go @@ -0,0 +1,53 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package hostvolumemanager + +import ( + "path/filepath" + "testing" + "time" + + cstate "github.com/hashicorp/nomad/client/state" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/shoenig/test/must" +) + +// db TODO(1.10.0): improve hostvolumemanager tests. + +func TestNewHostVolumeManager_restoreState(t *testing.T) { + log := testlog.HCLogger(t) + vol := &cstructs.HostVolumeState{ + ID: "test-vol-id", + CreateReq: &cstructs.ClientHostVolumeCreateRequest{ + ID: "test-vol-id", + PluginID: "mkdir", + }, + } + + t.Run("happy", func(t *testing.T) { + // put our volume in state + state := cstate.NewMemDB(log) + must.NoError(t, state.PutDynamicHostVolume(vol)) + + // new volume manager should load it from state and run Create, + // resulting in a volume directory in this mountDir. + mountDir := t.TempDir() + + _, err := NewHostVolumeManager(log, state, time.Second, "/wherever", mountDir) + must.NoError(t, err) + + volPath := filepath.Join(mountDir, vol.ID) + must.DirExists(t, volPath) + }) + + t.Run("get error", func(t *testing.T) { + state := &cstate.ErrDB{} + _, err := NewHostVolumeManager(log, state, time.Second, "/wherever", "/wherever") + // error loading state should break the world + must.ErrorIs(t, err, cstate.ErrDBError) + }) + + // db TODO: test plugin error +} diff --git a/client/state/db_bolt.go b/client/state/db_bolt.go index 2471cda3d..bef111f6e 100644 --- a/client/state/db_bolt.go +++ b/client/state/db_bolt.go @@ -138,6 +138,8 @@ var ( // nodeRegistrationKey is the key at which node registration data is stored. nodeRegistrationKey = []byte("node_registration") + + hostVolBucket = []byte("host_volumes_to_create") ) // taskBucketName returns the bucket name for the given task name. @@ -1048,6 +1050,45 @@ func (s *BoltStateDB) GetNodeRegistration() (*cstructs.NodeRegistration, error) return ®, err } +func (s *BoltStateDB) PutDynamicHostVolume(vol *cstructs.HostVolumeState) error { + return s.db.Update(func(tx *boltdd.Tx) error { + b, err := tx.CreateBucketIfNotExists(hostVolBucket) + if err != nil { + return err + } + return b.Put([]byte(vol.ID), vol) + }) +} + +func (s *BoltStateDB) GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) { + var vols []*cstructs.HostVolumeState + err := s.db.View(func(tx *boltdd.Tx) error { + b := tx.Bucket(hostVolBucket) + if b == nil { + return nil + } + return b.BoltBucket().ForEach(func(k, v []byte) error { + var vol cstructs.HostVolumeState + err := b.Get(k, &vol) + if err != nil { + return err + } + vols = append(vols, &vol) + return nil + }) + }) + if boltdd.IsErrNotFound(err) { + return nil, nil + } + return vols, err +} + +func (s *BoltStateDB) DeleteDynamicHostVolume(id string) error { + return s.db.Update(func(tx *boltdd.Tx) error { + return tx.Bucket(hostVolBucket).Delete([]byte(id)) + }) +} + // init initializes metadata entries in a newly created state database. func (s *BoltStateDB) init() error { return s.db.Update(func(tx *boltdd.Tx) error { diff --git a/client/state/db_error.go b/client/state/db_error.go index 78ef01b78..6c99defa2 100644 --- a/client/state/db_error.go +++ b/client/state/db_error.go @@ -4,6 +4,7 @@ package state import ( + "errors" "fmt" arstate "github.com/hashicorp/nomad/client/allocrunner/state" @@ -16,6 +17,10 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +var _ StateDB = &ErrDB{} + +var ErrDBError = errors.New("Error!") + // ErrDB implements a StateDB that returns errors on restore methods, used for testing type ErrDB struct { // Allocs is a preset slice of allocations used in GetAllAllocations @@ -154,6 +159,16 @@ func (m *ErrDB) GetNodeRegistration() (*cstructs.NodeRegistration, error) { return nil, fmt.Errorf("Error!") } +func (m *ErrDB) PutDynamicHostVolume(_ *cstructs.HostVolumeState) error { + return ErrDBError +} +func (m *ErrDB) GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) { + return nil, ErrDBError +} +func (m *ErrDB) DeleteDynamicHostVolume(_ string) error { + return ErrDBError +} + func (m *ErrDB) Close() error { return fmt.Errorf("Error!") } diff --git a/client/state/db_mem.go b/client/state/db_mem.go index 91e6481b4..32abd883e 100644 --- a/client/state/db_mem.go +++ b/client/state/db_mem.go @@ -60,6 +60,8 @@ type MemDB struct { nodeRegistration *cstructs.NodeRegistration + dynamicHostVolumes map[string]*cstructs.HostVolumeState + logger hclog.Logger mu sync.RWMutex @@ -68,15 +70,16 @@ type MemDB struct { func NewMemDB(logger hclog.Logger) *MemDB { logger = logger.Named("memdb") return &MemDB{ - allocs: make(map[string]*structs.Allocation), - deployStatus: make(map[string]*structs.AllocDeploymentStatus), - networkStatus: make(map[string]*structs.AllocNetworkStatus), - acknowledgedState: make(map[string]*arstate.State), - localTaskState: make(map[string]map[string]*state.LocalState), - taskState: make(map[string]map[string]*structs.TaskState), - checks: make(checks.ClientResults), - identities: make(map[string][]*structs.SignedWorkloadIdentity), - logger: logger, + allocs: make(map[string]*structs.Allocation), + deployStatus: make(map[string]*structs.AllocDeploymentStatus), + networkStatus: make(map[string]*structs.AllocNetworkStatus), + acknowledgedState: make(map[string]*arstate.State), + localTaskState: make(map[string]map[string]*state.LocalState), + taskState: make(map[string]map[string]*structs.TaskState), + checks: make(checks.ClientResults), + identities: make(map[string][]*structs.SignedWorkloadIdentity), + dynamicHostVolumes: make(map[string]*cstructs.HostVolumeState), + logger: logger, } } @@ -354,6 +357,28 @@ func (m *MemDB) GetNodeRegistration() (*cstructs.NodeRegistration, error) { return m.nodeRegistration, nil } +func (m *MemDB) PutDynamicHostVolume(vol *cstructs.HostVolumeState) error { + m.mu.Lock() + defer m.mu.Unlock() + m.dynamicHostVolumes[vol.ID] = vol + return nil +} +func (m *MemDB) GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) { + m.mu.Lock() + defer m.mu.Unlock() + var vols []*cstructs.HostVolumeState + for _, vol := range m.dynamicHostVolumes { + vols = append(vols, vol) + } + return vols, nil +} +func (m *MemDB) DeleteDynamicHostVolume(s string) error { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.dynamicHostVolumes, s) + return nil +} + func (m *MemDB) Close() error { m.mu.Lock() defer m.mu.Unlock() diff --git a/client/state/db_noop.go b/client/state/db_noop.go index 345025a4d..09488c181 100644 --- a/client/state/db_noop.go +++ b/client/state/db_noop.go @@ -14,6 +14,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +var _ StateDB = &NoopDB{} + // NoopDB implements a StateDB that does not persist any data. type NoopDB struct{} @@ -145,6 +147,16 @@ func (n NoopDB) GetNodeRegistration() (*cstructs.NodeRegistration, error) { return nil, nil } +func (n NoopDB) PutDynamicHostVolume(_ *cstructs.HostVolumeState) error { + return nil +} +func (n NoopDB) GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) { + return nil, nil +} +func (n NoopDB) DeleteDynamicHostVolume(_ string) error { + return nil +} + func (n NoopDB) Close() error { return nil } diff --git a/client/state/db_test.go b/client/state/db_test.go index d13431a62..3a03cf3a2 100644 --- a/client/state/db_test.go +++ b/client/state/db_test.go @@ -15,6 +15,7 @@ import ( dmstate "github.com/hashicorp/nomad/client/devicemanager/state" "github.com/hashicorp/nomad/client/dynamicplugins" driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -384,6 +385,41 @@ func TestStateDB_DynamicRegistry(t *testing.T) { }) } +// TestStateDB_HostVolumes asserts the behavior of dynamic host volume state. +func TestStateDB_HostVolumes(t *testing.T) { + ci.Parallel(t) + + testDB(t, func(t *testing.T, db StateDB) { + vols, err := db.GetDynamicHostVolumes() + must.NoError(t, err) + must.Len(t, 0, vols) + + vol := &cstructs.HostVolumeState{ + ID: "test-vol-id", + CreateReq: &cstructs.ClientHostVolumeCreateRequest{ + ID: "test-vol-id", + Name: "test-vol-name", + PluginID: "test-plugin-id", + NodeID: "test-node-id", + RequestedCapacityMinBytes: 5, + RequestedCapacityMaxBytes: 10, + Parameters: map[string]string{"test": "ing"}, + }, + } + + must.NoError(t, db.PutDynamicHostVolume(vol)) + vols, err = db.GetDynamicHostVolumes() + must.NoError(t, err) + must.Len(t, 1, vols) + must.Eq(t, vol, vols[0]) + + must.NoError(t, db.DeleteDynamicHostVolume(vol.ID)) + vols, err = db.GetDynamicHostVolumes() + must.NoError(t, err) + must.Len(t, 0, vols) + }) +} + func TestStateDB_CheckResult_keyForCheck(t *testing.T) { ci.Parallel(t) diff --git a/client/state/interface.go b/client/state/interface.go index a9cd48450..0460a75e2 100644 --- a/client/state/interface.go +++ b/client/state/interface.go @@ -137,6 +137,10 @@ type StateDB interface { PutNodeRegistration(*cstructs.NodeRegistration) error GetNodeRegistration() (*cstructs.NodeRegistration, error) + PutDynamicHostVolume(*cstructs.HostVolumeState) error + GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) + DeleteDynamicHostVolume(string) error + // Close the database. Unsafe for further use after calling regardless // of return value. Close() error diff --git a/client/structs/host_volumes.go b/client/structs/host_volumes.go index 38d3cb2d7..3188e45dc 100644 --- a/client/structs/host_volumes.go +++ b/client/structs/host_volumes.go @@ -3,6 +3,11 @@ package structs +type HostVolumeState struct { + ID string + CreateReq *ClientHostVolumeCreateRequest +} + type ClientHostVolumeCreateRequest struct { // ID is a UUID-like string generated by the server. ID string