dynamic host volumes: client state (#24595)

store dynamic host volume creations in client state,
so they can be "restored" on agent restart. restore works
by repeating the same Create operation as initial creation,
and expecting the plugin to be idempotent.

this is (potentially) especially important after host restarts,
which may have dropped mount points or such.
This commit is contained in:
Daniel Bennett
2024-12-03 16:47:37 -05:00
committed by Tim Gross
parent 787fbbe671
commit 05f1cda594
13 changed files with 323 additions and 29 deletions

View File

@@ -535,9 +535,16 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
c.devicemanager = devManager
c.pluginManagers.RegisterAndRun(devManager)
c.hostVolumeManager = hvm.NewHostVolumeManager(logger,
c.hostVolumeManager, err = hvm.NewHostVolumeManager(logger,
c.stateDB, hostVolumeRequestTimeout,
cfg.HostVolumePluginDir,
cfg.AllocMountsDir)
if err != nil {
// NewHostVolumeManager will only err if it fails to read state store,
// or if one or more required plugins do not exist, so halt the client
// because something needs to be fixed by a cluster admin.
return nil, err
}
// Set up the service registration wrapper using the Consul and Nomad
// implementations. The Nomad implementation is only ever used on the

View File

@@ -6,9 +6,11 @@ package client
import (
"path/filepath"
"testing"
"time"
"github.com/hashicorp/nomad/ci"
hvm "github.com/hashicorp/nomad/client/hostvolumemanager"
"github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/shoenig/test/must"
@@ -20,10 +22,15 @@ func TestHostVolume(t *testing.T) {
client, cleanup := TestClient(t, nil)
defer cleanup()
memdb := state.NewMemDB(testlog.HCLogger(t))
client.stateDB = memdb
tmp := t.TempDir()
var err error
expectDir := filepath.Join(tmp, "test-vol-id")
client.hostVolumeManager = hvm.NewHostVolumeManager(testlog.HCLogger(t),
"/no/ext/plugins", tmp)
client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t),
client.stateDB, time.Second, "/no/ext/plugins", tmp)
must.NoError(t, err)
t.Run("happy", func(t *testing.T) {
req := &cstructs.ClientHostVolumeCreateRequest{
@@ -40,6 +47,15 @@ func TestHostVolume(t *testing.T) {
}, resp)
// technically this is testing "mkdir" more than the RPC
must.DirExists(t, expectDir)
// ensure we saved to client state
vols, err := memdb.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 1, vols)
expectState := &cstructs.HostVolumeState{
ID: req.ID,
CreateReq: req,
}
must.Eq(t, expectState, vols[0])
delReq := &cstructs.ClientHostVolumeDeleteRequest{
ID: "test-vol-id",
@@ -52,6 +68,10 @@ func TestHostVolume(t *testing.T) {
must.NotNil(t, delResp)
// again, actually testing the "mkdir" plugin
must.DirNotExists(t, expectDir)
// client state should be deleted
vols, err = memdb.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 0, vols)
})
t.Run("missing plugin", func(t *testing.T) {
@@ -72,8 +92,9 @@ func TestHostVolume(t *testing.T) {
t.Run("error from plugin", func(t *testing.T) {
// "mkdir" plugin can't create a directory within a file
client.hostVolumeManager = hvm.NewHostVolumeManager(testlog.HCLogger(t),
"/no/ext/plugins", "host_volume_endpoint_test.go")
client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t),
client.stateDB, time.Second, "/no/ext/plugins", "host_volume_endpoint_test.go")
must.NoError(t, err)
req := &cstructs.ClientHostVolumeCreateRequest{
ID: "test-vol-id",

View File

@@ -29,9 +29,8 @@ type HostVolumePlugin interface {
}
type HostVolumePluginCreateResponse struct {
Path string `json:"path"`
SizeBytes int64 `json:"bytes"`
Context map[string]string `json:"context"` // metadata
Path string `json:"path"`
SizeBytes int64 `json:"bytes"`
}
const HostVolumePluginMkdirID = "mkdir"
@@ -70,7 +69,6 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context,
return &HostVolumePluginCreateResponse{
Path: path,
SizeBytes: 0,
Context: map[string]string{},
}, nil
}
@@ -147,8 +145,9 @@ func (p *HostVolumePluginExternal) Version(ctx context.Context) (*version.Versio
func (p *HostVolumePluginExternal) Create(ctx context.Context,
req *cstructs.ClientHostVolumeCreateRequest) (*HostVolumePluginCreateResponse, error) {
params, err := json.Marshal(req.Parameters) // db TODO(1.10.0): if this is nil, then PARAMETERS env will be "null"
params, err := json.Marshal(req.Parameters) // db TODO(1.10.0): document if this is nil, then PARAMETERS env will be "null"
if err != nil {
// this is a proper error, because users can set this in the volume spec
return nil, fmt.Errorf("error marshaling volume pramaters: %w", err)
}
envVars := []string{
@@ -165,7 +164,7 @@ func (p *HostVolumePluginExternal) Create(ctx context.Context,
}
var pluginResp HostVolumePluginCreateResponse
err = json.Unmarshal(stdout, &pluginResp)
err = json.Unmarshal(stdout, &pluginResp) // db TODO(1.10.0): if this fails, then the volume may have been created, according to the plugin, but Nomad will not save it
if err != nil {
return nil, err
}

View File

@@ -45,7 +45,6 @@ func TestHostVolumePluginMkdir(t *testing.T) {
must.Eq(t, &HostVolumePluginCreateResponse{
Path: target,
SizeBytes: 0,
Context: map[string]string{},
}, resp)
must.DirExists(t, target)
@@ -115,7 +114,6 @@ func TestHostVolumePluginExternal(t *testing.T) {
must.Eq(t, &HostVolumePluginCreateResponse{
Path: target,
SizeBytes: 5,
Context: map[string]string{"key": "val"},
}, resp)
must.DirExists(t, target)
logged := getLogs()

View File

@@ -7,9 +7,12 @@ import (
"context"
"errors"
"path/filepath"
"time"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-multierror"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
)
var (
@@ -17,22 +20,73 @@ var (
ErrPluginNotExecutable = errors.New("plugin not executable")
)
type HostVolumeStateManager interface {
PutDynamicHostVolume(*cstructs.HostVolumeState) error
GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error)
DeleteDynamicHostVolume(string) error
}
type HostVolumeManager struct {
pluginDir string
sharedMountDir string
stateMgr HostVolumeStateManager
log hclog.Logger
}
func NewHostVolumeManager(logger hclog.Logger, pluginDir, sharedMountDir string) *HostVolumeManager {
func NewHostVolumeManager(logger hclog.Logger,
state HostVolumeStateManager, restoreTimeout time.Duration,
pluginDir, sharedMountDir string) (*HostVolumeManager, error) {
log := logger.Named("host_volume_mgr")
// db TODO(1.10.0): how do we define the external mounter plugins? plugin configs?
return &HostVolumeManager{
log: log,
hvm := &HostVolumeManager{
pluginDir: pluginDir,
sharedMountDir: sharedMountDir,
stateMgr: state,
log: log,
}
if err := hvm.restoreState(state, restoreTimeout); err != nil {
return nil, err
}
return hvm, nil
}
func (hvm *HostVolumeManager) restoreState(state HostVolumeStateManager, timeout time.Duration) error {
vols, err := state.GetDynamicHostVolumes()
if err != nil {
return err
}
if len(vols) == 0 {
return nil // nothing to do
}
// re-"create" the volumes - plugins have the best knowledge of their
// side effects, and they must be idempotent.
group := multierror.Group{}
for _, vol := range vols {
group.Go(func() error { // db TODO(1.10.0): document that plugins must be safe to run concurrently
// missing plugins with associated volumes in state are considered
// client-stopping errors. they need to be fixed by cluster admins.
plug, err := hvm.getPlugin(vol.CreateReq.PluginID)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
if _, err := plug.Create(ctx, vol.CreateReq); err != nil {
// plugin execution errors are only logged
hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err)
}
return nil
})
}
mErr := group.Wait()
return helper.FlattenMultierror(mErr.ErrorOrNil())
}
func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) {
@@ -63,14 +117,35 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
return nil, err
}
volState := &cstructs.HostVolumeState{
ID: req.ID,
CreateReq: req,
}
if err := hvm.stateMgr.PutDynamicHostVolume(volState); err != nil {
// if we fail to write to state, delete the volume so it isn't left
// lying around without Nomad knowing about it.
hvm.log.Error("failed to save volume in state, so deleting", "volume_id", req.ID, "error", err)
delErr := plug.Delete(ctx, &cstructs.ClientHostVolumeDeleteRequest{
ID: req.ID,
PluginID: req.PluginID,
NodeID: req.NodeID,
HostPath: hvm.sharedMountDir,
Parameters: req.Parameters,
})
if delErr != nil {
hvm.log.Warn("error deleting volume after state store failure", "volume_id", req.ID, "error", delErr)
err = multierror.Append(err, delErr)
}
return nil, helper.FlattenMultierror(err)
}
// db TODO(1.10.0): now we need to add the volume to the node fingerprint!
resp := &cstructs.ClientHostVolumeCreateResponse{
HostPath: pluginResp.Path,
CapacityBytes: pluginResp.SizeBytes,
}
// db TODO(1.10.0): now we need to add it to the node fingerprint!
// db TODO(1.10.0): and save it in client state!
return resp, nil
}
@@ -89,7 +164,10 @@ func (hvm *HostVolumeManager) Delete(ctx context.Context,
resp := &cstructs.ClientHostVolumeDeleteResponse{}
// db TODO(1.10.0): save the client state!
if err := hvm.stateMgr.DeleteDynamicHostVolume(req.ID); err != nil {
hvm.log.Error("failed to delete volume in state", "volume_id", req.ID, "error", err)
return nil, err // bail so a user may retry
}
return resp, nil
}

View File

@@ -0,0 +1,53 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package hostvolumemanager
import (
"path/filepath"
"testing"
"time"
cstate "github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/shoenig/test/must"
)
// db TODO(1.10.0): improve hostvolumemanager tests.
func TestNewHostVolumeManager_restoreState(t *testing.T) {
log := testlog.HCLogger(t)
vol := &cstructs.HostVolumeState{
ID: "test-vol-id",
CreateReq: &cstructs.ClientHostVolumeCreateRequest{
ID: "test-vol-id",
PluginID: "mkdir",
},
}
t.Run("happy", func(t *testing.T) {
// put our volume in state
state := cstate.NewMemDB(log)
must.NoError(t, state.PutDynamicHostVolume(vol))
// new volume manager should load it from state and run Create,
// resulting in a volume directory in this mountDir.
mountDir := t.TempDir()
_, err := NewHostVolumeManager(log, state, time.Second, "/wherever", mountDir)
must.NoError(t, err)
volPath := filepath.Join(mountDir, vol.ID)
must.DirExists(t, volPath)
})
t.Run("get error", func(t *testing.T) {
state := &cstate.ErrDB{}
_, err := NewHostVolumeManager(log, state, time.Second, "/wherever", "/wherever")
// error loading state should break the world
must.ErrorIs(t, err, cstate.ErrDBError)
})
// db TODO: test plugin error
}

View File

@@ -138,6 +138,8 @@ var (
// nodeRegistrationKey is the key at which node registration data is stored.
nodeRegistrationKey = []byte("node_registration")
hostVolBucket = []byte("host_volumes_to_create")
)
// taskBucketName returns the bucket name for the given task name.
@@ -1048,6 +1050,45 @@ func (s *BoltStateDB) GetNodeRegistration() (*cstructs.NodeRegistration, error)
return &reg, err
}
func (s *BoltStateDB) PutDynamicHostVolume(vol *cstructs.HostVolumeState) error {
return s.db.Update(func(tx *boltdd.Tx) error {
b, err := tx.CreateBucketIfNotExists(hostVolBucket)
if err != nil {
return err
}
return b.Put([]byte(vol.ID), vol)
})
}
func (s *BoltStateDB) GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) {
var vols []*cstructs.HostVolumeState
err := s.db.View(func(tx *boltdd.Tx) error {
b := tx.Bucket(hostVolBucket)
if b == nil {
return nil
}
return b.BoltBucket().ForEach(func(k, v []byte) error {
var vol cstructs.HostVolumeState
err := b.Get(k, &vol)
if err != nil {
return err
}
vols = append(vols, &vol)
return nil
})
})
if boltdd.IsErrNotFound(err) {
return nil, nil
}
return vols, err
}
func (s *BoltStateDB) DeleteDynamicHostVolume(id string) error {
return s.db.Update(func(tx *boltdd.Tx) error {
return tx.Bucket(hostVolBucket).Delete([]byte(id))
})
}
// init initializes metadata entries in a newly created state database.
func (s *BoltStateDB) init() error {
return s.db.Update(func(tx *boltdd.Tx) error {

View File

@@ -4,6 +4,7 @@
package state
import (
"errors"
"fmt"
arstate "github.com/hashicorp/nomad/client/allocrunner/state"
@@ -16,6 +17,10 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
var _ StateDB = &ErrDB{}
var ErrDBError = errors.New("Error!")
// ErrDB implements a StateDB that returns errors on restore methods, used for testing
type ErrDB struct {
// Allocs is a preset slice of allocations used in GetAllAllocations
@@ -154,6 +159,16 @@ func (m *ErrDB) GetNodeRegistration() (*cstructs.NodeRegistration, error) {
return nil, fmt.Errorf("Error!")
}
func (m *ErrDB) PutDynamicHostVolume(_ *cstructs.HostVolumeState) error {
return ErrDBError
}
func (m *ErrDB) GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) {
return nil, ErrDBError
}
func (m *ErrDB) DeleteDynamicHostVolume(_ string) error {
return ErrDBError
}
func (m *ErrDB) Close() error {
return fmt.Errorf("Error!")
}

View File

@@ -60,6 +60,8 @@ type MemDB struct {
nodeRegistration *cstructs.NodeRegistration
dynamicHostVolumes map[string]*cstructs.HostVolumeState
logger hclog.Logger
mu sync.RWMutex
@@ -68,15 +70,16 @@ type MemDB struct {
func NewMemDB(logger hclog.Logger) *MemDB {
logger = logger.Named("memdb")
return &MemDB{
allocs: make(map[string]*structs.Allocation),
deployStatus: make(map[string]*structs.AllocDeploymentStatus),
networkStatus: make(map[string]*structs.AllocNetworkStatus),
acknowledgedState: make(map[string]*arstate.State),
localTaskState: make(map[string]map[string]*state.LocalState),
taskState: make(map[string]map[string]*structs.TaskState),
checks: make(checks.ClientResults),
identities: make(map[string][]*structs.SignedWorkloadIdentity),
logger: logger,
allocs: make(map[string]*structs.Allocation),
deployStatus: make(map[string]*structs.AllocDeploymentStatus),
networkStatus: make(map[string]*structs.AllocNetworkStatus),
acknowledgedState: make(map[string]*arstate.State),
localTaskState: make(map[string]map[string]*state.LocalState),
taskState: make(map[string]map[string]*structs.TaskState),
checks: make(checks.ClientResults),
identities: make(map[string][]*structs.SignedWorkloadIdentity),
dynamicHostVolumes: make(map[string]*cstructs.HostVolumeState),
logger: logger,
}
}
@@ -354,6 +357,28 @@ func (m *MemDB) GetNodeRegistration() (*cstructs.NodeRegistration, error) {
return m.nodeRegistration, nil
}
func (m *MemDB) PutDynamicHostVolume(vol *cstructs.HostVolumeState) error {
m.mu.Lock()
defer m.mu.Unlock()
m.dynamicHostVolumes[vol.ID] = vol
return nil
}
func (m *MemDB) GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) {
m.mu.Lock()
defer m.mu.Unlock()
var vols []*cstructs.HostVolumeState
for _, vol := range m.dynamicHostVolumes {
vols = append(vols, vol)
}
return vols, nil
}
func (m *MemDB) DeleteDynamicHostVolume(s string) error {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.dynamicHostVolumes, s)
return nil
}
func (m *MemDB) Close() error {
m.mu.Lock()
defer m.mu.Unlock()

View File

@@ -14,6 +14,8 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
)
var _ StateDB = &NoopDB{}
// NoopDB implements a StateDB that does not persist any data.
type NoopDB struct{}
@@ -145,6 +147,16 @@ func (n NoopDB) GetNodeRegistration() (*cstructs.NodeRegistration, error) {
return nil, nil
}
func (n NoopDB) PutDynamicHostVolume(_ *cstructs.HostVolumeState) error {
return nil
}
func (n NoopDB) GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error) {
return nil, nil
}
func (n NoopDB) DeleteDynamicHostVolume(_ string) error {
return nil
}
func (n NoopDB) Close() error {
return nil
}

View File

@@ -15,6 +15,7 @@ import (
dmstate "github.com/hashicorp/nomad/client/devicemanager/state"
"github.com/hashicorp/nomad/client/dynamicplugins"
driverstate "github.com/hashicorp/nomad/client/pluginmanager/drivermanager/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
@@ -384,6 +385,41 @@ func TestStateDB_DynamicRegistry(t *testing.T) {
})
}
// TestStateDB_HostVolumes asserts the behavior of dynamic host volume state.
func TestStateDB_HostVolumes(t *testing.T) {
ci.Parallel(t)
testDB(t, func(t *testing.T, db StateDB) {
vols, err := db.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 0, vols)
vol := &cstructs.HostVolumeState{
ID: "test-vol-id",
CreateReq: &cstructs.ClientHostVolumeCreateRequest{
ID: "test-vol-id",
Name: "test-vol-name",
PluginID: "test-plugin-id",
NodeID: "test-node-id",
RequestedCapacityMinBytes: 5,
RequestedCapacityMaxBytes: 10,
Parameters: map[string]string{"test": "ing"},
},
}
must.NoError(t, db.PutDynamicHostVolume(vol))
vols, err = db.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 1, vols)
must.Eq(t, vol, vols[0])
must.NoError(t, db.DeleteDynamicHostVolume(vol.ID))
vols, err = db.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 0, vols)
})
}
func TestStateDB_CheckResult_keyForCheck(t *testing.T) {
ci.Parallel(t)

View File

@@ -137,6 +137,10 @@ type StateDB interface {
PutNodeRegistration(*cstructs.NodeRegistration) error
GetNodeRegistration() (*cstructs.NodeRegistration, error)
PutDynamicHostVolume(*cstructs.HostVolumeState) error
GetDynamicHostVolumes() ([]*cstructs.HostVolumeState, error)
DeleteDynamicHostVolume(string) error
// Close the database. Unsafe for further use after calling regardless
// of return value.
Close() error

View File

@@ -3,6 +3,11 @@
package structs
type HostVolumeState struct {
ID string
CreateReq *ClientHostVolumeCreateRequest
}
type ClientHostVolumeCreateRequest struct {
// ID is a UUID-like string generated by the server.
ID string