From 05f1cda594634edf99f1ffff00f5e21b41870253 Mon Sep 17 00:00:00 2001 From: Daniel Bennett Date: Tue, 3 Dec 2024 16:47:37 -0500 Subject: [PATCH] dynamic host volumes: client state (#24595) store dynamic host volume creations in client state, so they can be "restored" on agent restart. restore works by repeating the same Create operation as initial creation, and expecting the plugin to be idempotent. this is (potentially) especially important after host restarts, which may have dropped mount points or such. --- client/client.go | 9 +- client/host_volume_endpoint_test.go | 29 +++++- .../hostvolumemanager/host_volume_plugin.go | 11 +-- .../host_volume_plugin_test.go | 2 - client/hostvolumemanager/host_volumes.go | 92 +++++++++++++++++-- client/hostvolumemanager/host_volumes_test.go | 53 +++++++++++ client/state/db_bolt.go | 41 +++++++++ client/state/db_error.go | 15 +++ client/state/db_mem.go | 43 +++++++-- client/state/db_noop.go | 12 +++ client/state/db_test.go | 36 ++++++++ client/state/interface.go | 4 + client/structs/host_volumes.go | 5 + 13 files changed, 323 insertions(+), 29 deletions(-) create mode 100644 client/hostvolumemanager/host_volumes_test.go diff --git a/client/client.go b/client/client.go index 4fc808eca..ac4a7dad7 100644 --- a/client/client.go +++ b/client/client.go @@ -535,9 +535,16 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie c.devicemanager = devManager c.pluginManagers.RegisterAndRun(devManager) - c.hostVolumeManager = hvm.NewHostVolumeManager(logger, + c.hostVolumeManager, err = hvm.NewHostVolumeManager(logger, + c.stateDB, hostVolumeRequestTimeout, cfg.HostVolumePluginDir, cfg.AllocMountsDir) + if err != nil { + // NewHostVolumeManager will only err if it fails to read state store, + // or if one or more required plugins do not exist, so halt the client + // because something needs to be fixed by a cluster admin. + return nil, err + } // Set up the service registration wrapper using the Consul and Nomad // implementations. The Nomad implementation is only ever used on the diff --git a/client/host_volume_endpoint_test.go b/client/host_volume_endpoint_test.go index 037880246..7fbc042d8 100644 --- a/client/host_volume_endpoint_test.go +++ b/client/host_volume_endpoint_test.go @@ -6,9 +6,11 @@ package client import ( "path/filepath" "testing" + "time" "github.com/hashicorp/nomad/ci" hvm "github.com/hashicorp/nomad/client/hostvolumemanager" + "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/testlog" "github.com/shoenig/test/must" @@ -20,10 +22,15 @@ func TestHostVolume(t *testing.T) { client, cleanup := TestClient(t, nil) defer cleanup() + memdb := state.NewMemDB(testlog.HCLogger(t)) + client.stateDB = memdb + tmp := t.TempDir() + var err error expectDir := filepath.Join(tmp, "test-vol-id") - client.hostVolumeManager = hvm.NewHostVolumeManager(testlog.HCLogger(t), - "/no/ext/plugins", tmp) + client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t), + client.stateDB, time.Second, "/no/ext/plugins", tmp) + must.NoError(t, err) t.Run("happy", func(t *testing.T) { req := &cstructs.ClientHostVolumeCreateRequest{ @@ -40,6 +47,15 @@ func TestHostVolume(t *testing.T) { }, resp) // technically this is testing "mkdir" more than the RPC must.DirExists(t, expectDir) + // ensure we saved to client state + vols, err := memdb.GetDynamicHostVolumes() + must.NoError(t, err) + must.Len(t, 1, vols) + expectState := &cstructs.HostVolumeState{ + ID: req.ID, + CreateReq: req, + } + must.Eq(t, expectState, vols[0]) delReq := &cstructs.ClientHostVolumeDeleteRequest{ ID: "test-vol-id", @@ -52,6 +68,10 @@ func TestHostVolume(t *testing.T) { must.NotNil(t, delResp) // again, actually testing the "mkdir" plugin must.DirNotExists(t, expectDir) + // client state should be deleted + vols, err = memdb.GetDynamicHostVolumes() + must.NoError(t, err) + must.Len(t, 0, vols) }) t.Run("missing plugin", func(t *testing.T) { @@ -72,8 +92,9 @@ func TestHostVolume(t *testing.T) { t.Run("error from plugin", func(t *testing.T) { // "mkdir" plugin can't create a directory within a file - client.hostVolumeManager = hvm.NewHostVolumeManager(testlog.HCLogger(t), - "/no/ext/plugins", "host_volume_endpoint_test.go") + client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t), + client.stateDB, time.Second, "/no/ext/plugins", "host_volume_endpoint_test.go") + must.NoError(t, err) req := &cstructs.ClientHostVolumeCreateRequest{ ID: "test-vol-id", diff --git a/client/hostvolumemanager/host_volume_plugin.go b/client/hostvolumemanager/host_volume_plugin.go index 0616ce337..17cdf0279 100644 --- a/client/hostvolumemanager/host_volume_plugin.go +++ b/client/hostvolumemanager/host_volume_plugin.go @@ -29,9 +29,8 @@ type HostVolumePlugin interface { } type HostVolumePluginCreateResponse struct { - Path string `json:"path"` - SizeBytes int64 `json:"bytes"` - Context map[string]string `json:"context"` // metadata + Path string `json:"path"` + SizeBytes int64 `json:"bytes"` } const HostVolumePluginMkdirID = "mkdir" @@ -70,7 +69,6 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context, return &HostVolumePluginCreateResponse{ Path: path, SizeBytes: 0, - Context: map[string]string{}, }, nil } @@ -147,8 +145,9 @@ func (p *HostVolumePluginExternal) Version(ctx context.Context) (*version.Versio func (p *HostVolumePluginExternal) Create(ctx context.Context, req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) { - params, err := json.Marshal(req.Parameters) // db TODO(1.10.0): if this is nil, then PARAMETERS env will be "null" + params, err := json.Marshal(req.Parameters) // db TODO(1.10.0): document if this is nil, then PARAMETERS env will be "null" if err != nil { + // this is a proper error, because users can set this in the volume spec return nil, fmt.Errorf("error marshaling volume pramaters: %w", err) } envVars := []string{ @@ -165,7 +164,7 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context, } var pluginResp HostVolumePluginCreateResponse - err = json.Unmarshal(stdout, &pluginResp) + err = json.Unmarshal(stdout, &pluginResp) // db TODO(1.10.0): if this fails, then the volume may have been created, according to the plugin, but Nomad will not save it if err != nil { return nil, err } diff --git a/client/hostvolumemanager/host_volume_plugin_test.go b/client/hostvolumemanager/host_volume_plugin_test.go index 954686d74..18de2e1f3 100644 --- a/client/hostvolumemanager/host_volume_plugin_test.go +++ b/client/hostvolumemanager/host_volume_plugin_test.go @@ -45,7 +45,6 @@ func TestHostVolumePluginMkdir(t *testing.T) { must.Eq(t, &HostVolumePluginCreateResponse{ Path: target, SizeBytes: 0, - Context: map[string]string{}, }, resp) must.DirExists(t, target) @@ -115,7 +114,6 @@ func TestHostVolumePluginExternal(t *testing.T) { must.Eq(t, &HostVolumePluginCreateResponse{ Path: target, SizeBytes: 5, - Context: map[string]string{"key": "val"}, }, resp) must.DirExists(t, target) logged := getLogs() diff --git a/client/hostvolumemanager/host_volumes.go b/client/hostvolumemanager/host_volumes.go index 39ab9bb89..9827084a7 100644 --- a/client/hostvolumemanager/host_volumes.go +++ b/client/hostvolumemanager/host_volumes.go @@ -7,9 +7,12 @@ import ( "context" "errors" "path/filepath" + "time" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-multierror" cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper" ) var ( @@ -17,22 +20,73 @@ var ( ErrPluginNotExecutable = errors.New("plugin not executable") ) +type HostVolumeStateManager interface { + PutDynamicHostVolume(*cstructs.HostVolumeState) error + GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) + DeleteDynamicHostVolume(string) error +} + type HostVolumeManager struct { pluginDir string sharedMountDir string + stateMgr HostVolumeStateManager log hclog.Logger } -func NewHostVolumeManager(logger hclog.Logger, pluginDir, sharedMountDir string) *HostVolumeManager { +func NewHostVolumeManager(logger hclog.Logger, + state HostVolumeStateManager, restoreTimeout time.Duration, + pluginDir, sharedMountDir string) (*HostVolumeManager, error) { + log := logger.Named("host_volume_mgr") // db TODO(1.10.0): how do we define the external mounter plugins? plugin configs? - return &HostVolumeManager{ - log: log, + hvm := &HostVolumeManager{ pluginDir: pluginDir, sharedMountDir: sharedMountDir, + stateMgr: state, + log: log, } + + if err := hvm.restoreState(state, restoreTimeout); err != nil { + return nil, err + } + + return hvm, nil +} + +func (hvm *HostVolumeManager) restoreState(state HostVolumeStateManager, timeout time.Duration) error { + vols, err := state.GetDynamicHostVolumes() + if err != nil { + return err + } + if len(vols) == 0 { + return nil // nothing to do + } + + // re-"create" the volumes - plugins have the best knowledge of their + // side effects, and they must be idempotent. + group := multierror.Group{} + for _, vol := range vols { + group.Go(func() error { // db TODO(1.10.0): document that plugins must be safe to run concurrently + // missing plugins with associated volumes in state are considered + // client-stopping errors. they need to be fixed by cluster admins. + plug, err := hvm.getPlugin(vol.CreateReq.PluginID) + if err != nil { + return err + } + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + if _, err := plug.Create(ctx, vol.CreateReq); err != nil { + // plugin execution errors are only logged + hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err) + } + return nil + }) + } + mErr := group.Wait() + return helper.FlattenMultierror(mErr.ErrorOrNil()) } func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) { @@ -63,14 +117,35 @@ func (hvm *HostVolumeManager) Create(ctx context.Context, return nil, err } + volState := &cstructs.HostVolumeState{ + ID: req.ID, + CreateReq: req, + } + if err := hvm.stateMgr.PutDynamicHostVolume(volState); err != nil { + // if we fail to write to state, delete the volume so it isn't left + // lying around without Nomad knowing about it. + hvm.log.Error("failed to save volume in state, so deleting", "volume_id", req.ID, "error", err) + delErr := plug.Delete(ctx, &cstructs.ClientHostVolumeDeleteRequest{ + ID: req.ID, + PluginID: req.PluginID, + NodeID: req.NodeID, + HostPath: hvm.sharedMountDir, + Parameters: req.Parameters, + }) + if delErr != nil { + hvm.log.Warn("error deleting volume after state store failure", "volume_id", req.ID, "error", delErr) + err = multierror.Append(err, delErr) + } + return nil, helper.FlattenMultierror(err) + } + + // db TODO(1.10.0): now we need to add the volume to the node fingerprint! + resp := &cstructs.ClientHostVolumeCreateResponse{ HostPath: pluginResp.Path, CapacityBytes: pluginResp.SizeBytes, } - // db TODO(1.10.0): now we need to add it to the node fingerprint! - // db TODO(1.10.0): and save it in client state! - return resp, nil } @@ -89,7 +164,10 @@ func (hvm *HostVolumeManager) Delete(ctx context.Context, resp := &cstructs.ClientHostVolumeDeleteResponse{} - // db TODO(1.10.0): save the client state! + if err := hvm.stateMgr.DeleteDynamicHostVolume(req.ID); err != nil { + hvm.log.Error("failed to delete volume in state", "volume_id", req.ID, "error", err) + return nil, err // bail so a user may retry + } return resp, nil } diff --git a/client/hostvolumemanager/host_volumes_test.go b/client/hostvolumemanager/host_volumes_test.go new file mode 100644 index 000000000..a94526416 --- /dev/null +++ b/client/hostvolumemanager/host_volumes_test.go @@ -0,0 +1,53 @@ +// Copyright (c) HashiCorp, Inc. +// SPDX-License-Identifier: BUSL-1.1 + +package hostvolumemanager + +import ( + "path/filepath" + "testing" + "time" + + cstate "github.com/hashicorp/nomad/client/state" + cstructs "github.com/hashicorp/nomad/client/structs" + "github.com/hashicorp/nomad/helper/testlog" + "github.com/shoenig/test/must" +) + +// db TODO(1.10.0): improve hostvolumemanager tests. + +func TestNewHostVolumeManager_restoreState(t *testing.T) { + log := testlog.HCLogger(t) + vol := &cstructs.HostVolumeState{ + ID: "test-vol-id", + CreateReq: &cstructs.ClientHostVolumeCreateRequest{ + ID: "test-vol-id", + PluginID: "mkdir", + }, + } + + t.Run("happy", func(t *testing.T) { + // put our volume in state + state := cstate.NewMemDB(log) + must.NoError(t, state.PutDynamicHostVolume(vol)) + + // new volume manager should load it from state and run Create, + // resulting in a volume directory in this mountDir. + mountDir := t.TempDir() + + _, err := NewHostVolumeManager(log, state, time.Second, "/wherever", mountDir) + must.NoError(t, err) + + volPath := filepath.Join(mountDir, vol.ID) + must.DirExists(t, volPath) + }) + + t.Run("get error", func(t *testing.T) { + state := &cstate.ErrDB{} + _, err := NewHostVolumeManager(log, state, time.Second, "/wherever", "/wherever") + // error loading state should break the world + must.ErrorIs(t, err, cstate.ErrDBError) + }) + + // db TODO: test plugin error +} diff --git a/client/state/db_bolt.go b/client/state/db_bolt.go index 2471cda3d..bef111f6e 100644 --- a/client/state/db_bolt.go +++ b/client/state/db_bolt.go @@ -138,6 +138,8 @@ var ( // nodeRegistrationKey is the key at which node registration data is stored. nodeRegistrationKey = []byte("node_registration") + + hostVolBucket = []byte("host_volumes_to_create") ) // taskBucketName returns the bucket name for the given task name. @@ -1048,6 +1050,45 @@ func (s *BoltStateDB) GetNodeRegistration() (*cstructs.NodeRegistration, error) return ®, err } +func (s *BoltStateDB) PutDynamicHostVolume(vol *cstructs.HostVolumeState) error { + return s.db.Update(func(tx *boltdd.Tx) error { + b, err := tx.CreateBucketIfNotExists(hostVolBucket) + if err != nil { + return err + } + return b.Put([]byte(vol.ID), vol) + }) +} + +func (s *BoltStateDB) GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) { + var vols []*cstructs.HostVolumeState + err := s.db.View(func(tx *boltdd.Tx) error { + b := tx.Bucket(hostVolBucket) + if b == nil { + return nil + } + return b.BoltBucket().ForEach(func(k, v []byte) error { + var vol cstructs.HostVolumeState + err := b.Get(k, &vol) + if err != nil { + return err + } + vols = append(vols, &vol) + return nil + }) + }) + if boltdd.IsErrNotFound(err) { + return nil, nil + } + return vols, err +} + +func (s *BoltStateDB) DeleteDynamicHostVolume(id string) error { + return s.db.Update(func(tx *boltdd.Tx) error { + return tx.Bucket(hostVolBucket).Delete([]byte(id)) + }) +} + // init initializes metadata entries in a newly created state database. func (s *BoltStateDB) init() error { return s.db.Update(func(tx *boltdd.Tx) error { diff --git a/client/state/db_error.go b/client/state/db_error.go index 78ef01b78..6c99defa2 100644 --- a/client/state/db_error.go +++ b/client/state/db_error.go @@ -4,6 +4,7 @@ package state import ( + "errors" "fmt" arstate "github.com/hashicorp/nomad/client/allocrunner/state" @@ -16,6 +17,10 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +var _ StateDB = &ErrDB{} + +var ErrDBError = errors.New("Error!") + // ErrDB implements a StateDB that returns errors on restore methods, used for testing type ErrDB struct { // Allocs is a preset slice of allocations used in GetAllAllocations @@ -154,6 +159,16 @@ func (m *ErrDB) GetNodeRegistration() (*cstructs.NodeRegistration, error) { return nil, fmt.Errorf("Error!") } +func (m *ErrDB) PutDynamicHostVolume(_ *cstructs.HostVolumeState) error { + return ErrDBError +} +func (m *ErrDB) GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) { + return nil, ErrDBError +} +func (m *ErrDB) DeleteDynamicHostVolume(_ string) error { + return ErrDBError +} + func (m *ErrDB) Close() error { return fmt.Errorf("Error!") } diff --git a/client/state/db_mem.go b/client/state/db_mem.go index 91e6481b4..32abd883e 100644 --- a/client/state/db_mem.go +++ b/client/state/db_mem.go @@ -60,6 +60,8 @@ type MemDB struct { nodeRegistration *cstructs.NodeRegistration + dynamicHostVolumes map[string]*cstructs.HostVolumeState + logger hclog.Logger mu sync.RWMutex @@ -68,15 +70,16 @@ type MemDB struct { func NewMemDB(logger hclog.Logger) *MemDB { logger = logger.Named("memdb") return &MemDB{ - allocs: make(map[string]*structs.Allocation), - deployStatus: make(map[string]*structs.AllocDeploymentStatus), - networkStatus: make(map[string]*structs.AllocNetworkStatus), - acknowledgedState: make(map[string]*arstate.State), - localTaskState: make(map[string]map[string]*state.LocalState), - taskState: make(map[string]map[string]*structs.TaskState), - checks: make(checks.ClientResults), - identities: make(map[string][]*structs.SignedWorkloadIdentity), - logger: logger, + allocs: make(map[string]*structs.Allocation), + deployStatus: make(map[string]*structs.AllocDeploymentStatus), + networkStatus: make(map[string]*structs.AllocNetworkStatus), + acknowledgedState: make(map[string]*arstate.State), + localTaskState: make(map[string]map[string]*state.LocalState), + taskState: make(map[string]map[string]*structs.TaskState), + checks: make(checks.ClientResults), + identities: make(map[string][]*structs.SignedWorkloadIdentity), + dynamicHostVolumes: make(map[string]*cstructs.HostVolumeState), + logger: logger, } } @@ -354,6 +357,28 @@ func (m *MemDB) GetNodeRegistration() (*cstructs.NodeRegistration, error) { return m.nodeRegistration, nil } +func (m *MemDB) PutDynamicHostVolume(vol *cstructs.HostVolumeState) error { + m.mu.Lock() + defer m.mu.Unlock() + m.dynamicHostVolumes[vol.ID] = vol + return nil +} +func (m *MemDB) GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) { + m.mu.Lock() + defer m.mu.Unlock() + var vols []*cstructs.HostVolumeState + for _, vol := range m.dynamicHostVolumes { + vols = append(vols, vol) + } + return vols, nil +} +func (m *MemDB) DeleteDynamicHostVolume(s string) error { + m.mu.Lock() + defer m.mu.Unlock() + delete(m.dynamicHostVolumes, s) + return nil +} + func (m *MemDB) Close() error { m.mu.Lock() defer m.mu.Unlock() diff --git a/client/state/db_noop.go b/client/state/db_noop.go index 345025a4d..09488c181 100644 --- a/client/state/db_noop.go +++ b/client/state/db_noop.go @@ -14,6 +14,8 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +var _ StateDB = &NoopDB{} + // NoopDB implements a StateDB that does not persist any data. type NoopDB struct{} @@ -145,6 +147,16 @@ func (n NoopDB) GetNodeRegistration() (*cstructs.NodeRegistration, error) { return nil, nil } +func (n NoopDB) PutDynamicHostVolume(_ *cstructs.HostVolumeState) error { + return nil +} +func (n NoopDB) GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) { + return nil, nil +} +func (n NoopDB) DeleteDynamicHostVolume(_ string) error { + return nil +} + func (n NoopDB) Close() error { return nil } diff --git a/client/state/db_test.go b/client/state/db_test.go index d13431a62..3a03cf3a2 100644 --- a/client/state/db_test.go +++ b/client/state/db_test.go @@ -15,6 +15,7 @@ import ( dmstate "github.com/hashicorp/nomad/client/devicemanager/state" "github.com/hashicorp/nomad/client/dynamicplugins" driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state" + cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -384,6 +385,41 @@ func TestStateDB_DynamicRegistry(t *testing.T) { }) } +// TestStateDB_HostVolumes asserts the behavior of dynamic host volume state. +func TestStateDB_HostVolumes(t *testing.T) { + ci.Parallel(t) + + testDB(t, func(t *testing.T, db StateDB) { + vols, err := db.GetDynamicHostVolumes() + must.NoError(t, err) + must.Len(t, 0, vols) + + vol := &cstructs.HostVolumeState{ + ID: "test-vol-id", + CreateReq: &cstructs.ClientHostVolumeCreateRequest{ + ID: "test-vol-id", + Name: "test-vol-name", + PluginID: "test-plugin-id", + NodeID: "test-node-id", + RequestedCapacityMinBytes: 5, + RequestedCapacityMaxBytes: 10, + Parameters: map[string]string{"test": "ing"}, + }, + } + + must.NoError(t, db.PutDynamicHostVolume(vol)) + vols, err = db.GetDynamicHostVolumes() + must.NoError(t, err) + must.Len(t, 1, vols) + must.Eq(t, vol, vols[0]) + + must.NoError(t, db.DeleteDynamicHostVolume(vol.ID)) + vols, err = db.GetDynamicHostVolumes() + must.NoError(t, err) + must.Len(t, 0, vols) + }) +} + func TestStateDB_CheckResult_keyForCheck(t *testing.T) { ci.Parallel(t) diff --git a/client/state/interface.go b/client/state/interface.go index a9cd48450..0460a75e2 100644 --- a/client/state/interface.go +++ b/client/state/interface.go @@ -137,6 +137,10 @@ type StateDB interface { PutNodeRegistration(*cstructs.NodeRegistration) error GetNodeRegistration() (*cstructs.NodeRegistration, error) + PutDynamicHostVolume(*cstructs.HostVolumeState) error + GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) + DeleteDynamicHostVolume(string) error + // Close the database. Unsafe for further use after calling regardless // of return value. Close() error diff --git a/client/structs/host_volumes.go b/client/structs/host_volumes.go index 38d3cb2d7..3188e45dc 100644 --- a/client/structs/host_volumes.go +++ b/client/structs/host_volumes.go @@ -3,6 +3,11 @@ package structs +type HostVolumeState struct { + ID string + CreateReq *ClientHostVolumeCreateRequest +} + type ClientHostVolumeCreateRequest struct { // ID is a UUID-like string generated by the server. ID string