mirror of
https://github.com/kemko/nomad.git
synced 2026-01-01 16:05:42 +03:00
dynamic host volumes: unique volume name per node (#24748)
a node can have only one volume with a given name. the scheduler prevents duplicates, but can only do so after the server knows about the volume. this prevents multiple concurrent creates being called faster than the fingerprint/heartbeat interval. users may still modify an existing volume only if they set the `id` in the volume spec and re-issue `nomad volume create` if a *static* vol is added to config with a name already being used by a dynamic volume, the dynamic takes precedence, but log a warning.
This commit is contained in:
@@ -408,6 +408,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
|
||||
}
|
||||
|
||||
c.batchNodeUpdates = newBatchNodeUpdates(
|
||||
c.logger,
|
||||
c.updateNodeFromDriver,
|
||||
c.updateNodeFromDevices,
|
||||
c.updateNodeFromCSI,
|
||||
|
||||
@@ -19,6 +19,7 @@ import (
|
||||
var (
|
||||
ErrPluginNotExists = errors.New("no such plugin")
|
||||
ErrPluginNotExecutable = errors.New("plugin not executable")
|
||||
ErrVolumeNameExists = errors.New("volume name already exists on this node")
|
||||
)
|
||||
|
||||
// HostVolumeStateManager manages the lifecycle of volumes in client state.
|
||||
@@ -53,6 +54,7 @@ type HostVolumeManager struct {
|
||||
stateMgr HostVolumeStateManager
|
||||
updateNodeVols HostVolumeNodeUpdater
|
||||
builtIns map[string]HostVolumePlugin
|
||||
locker *volLocker
|
||||
log hclog.Logger
|
||||
}
|
||||
|
||||
@@ -71,6 +73,7 @@ func NewHostVolumeManager(logger hclog.Logger, config Config) *HostVolumeManager
|
||||
log: logger.With("plugin_id", HostVolumePluginMkdirID),
|
||||
},
|
||||
},
|
||||
locker: &volLocker{},
|
||||
log: logger,
|
||||
}
|
||||
}
|
||||
@@ -85,8 +88,14 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// can't have two of the same volume name w/ different IDs per client node
|
||||
if err := hvm.locker.lock(req.Name, req.ID); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
pluginResp, err := plug.Create(ctx, req)
|
||||
if err != nil {
|
||||
hvm.locker.release(req.Name)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
@@ -109,6 +118,8 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
|
||||
hvm.log.Warn("error deleting volume after state store failure", "volume_id", req.ID, "error", delErr)
|
||||
err = multierror.Append(err, delErr)
|
||||
}
|
||||
// free up the volume name whether delete succeeded or not.
|
||||
hvm.locker.release(req.Name)
|
||||
return nil, helper.FlattenMultierror(err)
|
||||
}
|
||||
|
||||
@@ -144,6 +155,9 @@ func (hvm *HostVolumeManager) Delete(ctx context.Context,
|
||||
return nil, err // bail so a user may retry
|
||||
}
|
||||
|
||||
// free up volume name for reuse
|
||||
hvm.locker.release(req.Name)
|
||||
|
||||
hvm.updateNodeVols(req.Name, nil)
|
||||
|
||||
resp := &cstructs.ClientHostVolumeDeleteResponse{
|
||||
@@ -191,6 +205,21 @@ func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap,
|
||||
return err
|
||||
}
|
||||
|
||||
// lock the name so future creates can't produce duplicates.
|
||||
err = hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID)
|
||||
// state should never have duplicate vol names, and restore happens
|
||||
// prior to node registration, so new creates shouldn't come in
|
||||
// concurrently, but check for error just in case.
|
||||
if err != nil {
|
||||
hvm.log.Error("error during restore",
|
||||
"volume_name", vol.CreateReq.Name,
|
||||
"volume_id", vol.CreateReq.ID,
|
||||
"error", err)
|
||||
// don't stop the world if it does happen, because an admin
|
||||
// couldn't do anything about it short of wiping client state.
|
||||
return nil
|
||||
}
|
||||
|
||||
resp, err := plug.Create(ctx, vol.CreateReq)
|
||||
if err != nil {
|
||||
// plugin execution errors are only logged
|
||||
@@ -221,3 +250,29 @@ func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, resp *HostVolumeP
|
||||
ReadOnly: false,
|
||||
}
|
||||
}
|
||||
|
||||
// volLocker is used to ensure that volumes on each node are unique by name.
|
||||
// The volume scheduler will prevent this too, but only after node fingerprint,
|
||||
// so we need to protect against concurrent duplicate creates.
|
||||
type volLocker struct {
|
||||
locks sync.Map
|
||||
}
|
||||
|
||||
// lock the provided name, error if it was already locked with a different ID
|
||||
func (l *volLocker) lock(name, id string) error {
|
||||
current, exists := l.locks.LoadOrStore(name, id)
|
||||
if exists && id != current.(string) {
|
||||
return ErrVolumeNameExists
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (l *volLocker) release(name string) {
|
||||
l.locks.Delete(name)
|
||||
}
|
||||
|
||||
// only used in tests to assert lock state
|
||||
func (l *volLocker) isLocked(name string) bool {
|
||||
_, locked := l.locks.Load(name)
|
||||
return locked
|
||||
}
|
||||
|
||||
@@ -27,7 +27,7 @@ func TestHostVolumeManager(t *testing.T) {
|
||||
tmp := t.TempDir()
|
||||
errDB := &cstate.ErrDB{}
|
||||
memDB := cstate.NewMemDB(log)
|
||||
node := newFakeNode()
|
||||
node := newFakeNode(t)
|
||||
|
||||
hvm := NewHostVolumeManager(log, Config{
|
||||
PluginDir: "./test_fixtures",
|
||||
@@ -43,9 +43,10 @@ func TestHostVolumeManager(t *testing.T) {
|
||||
|
||||
t.Run("create", func(t *testing.T) {
|
||||
// plugin doesn't exist
|
||||
name := "vol-name"
|
||||
req := &cstructs.ClientHostVolumeCreateRequest{
|
||||
Name: name,
|
||||
ID: "vol-id",
|
||||
Name: "vol-name",
|
||||
PluginID: "nope",
|
||||
|
||||
RequestedCapacityMinBytes: 5,
|
||||
@@ -58,6 +59,7 @@ func TestHostVolumeManager(t *testing.T) {
|
||||
plug.createErr = errors.New("sad create")
|
||||
_, err = hvm.Create(ctx, req)
|
||||
must.ErrorIs(t, err, plug.createErr)
|
||||
assertNotLocked(t, hvm, name)
|
||||
plug.reset()
|
||||
|
||||
// error saving state, then error from cleanup attempt
|
||||
@@ -65,24 +67,27 @@ func TestHostVolumeManager(t *testing.T) {
|
||||
_, err = hvm.Create(ctx, req)
|
||||
must.ErrorIs(t, err, cstate.ErrDBError)
|
||||
must.ErrorIs(t, err, plug.deleteErr)
|
||||
assertNotLocked(t, hvm, name)
|
||||
plug.reset()
|
||||
|
||||
// error saving state, successful cleanup
|
||||
_, err = hvm.Create(ctx, req)
|
||||
must.ErrorIs(t, err, cstate.ErrDBError)
|
||||
must.Eq(t, "vol-id", plug.deleted)
|
||||
assertNotLocked(t, hvm, name)
|
||||
plug.reset()
|
||||
|
||||
// happy path
|
||||
hvm.stateMgr = memDB
|
||||
resp, err := hvm.Create(ctx, req)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, &cstructs.ClientHostVolumeCreateResponse{
|
||||
expectResp := &cstructs.ClientHostVolumeCreateResponse{
|
||||
VolumeName: "vol-name",
|
||||
VolumeID: "vol-id",
|
||||
HostPath: tmp,
|
||||
CapacityBytes: 5,
|
||||
}, resp)
|
||||
}
|
||||
must.Eq(t, expectResp, resp)
|
||||
stateDBs, err := memDB.GetDynamicHostVolumes()
|
||||
must.NoError(t, err)
|
||||
// should be saved to state
|
||||
@@ -90,30 +95,54 @@ func TestHostVolumeManager(t *testing.T) {
|
||||
must.Eq(t, "vol-id", stateDBs[0].ID)
|
||||
must.Eq(t, "vol-id", stateDBs[0].CreateReq.ID)
|
||||
// should be registered with node
|
||||
must.MapContainsKey(t, node.vols, "vol-name", must.Sprintf("no vol-name in %+v", node.vols))
|
||||
must.MapContainsKey(t, node.vols, name, must.Sprintf("no %q in %+v", name, node.vols))
|
||||
assertLocked(t, hvm, name)
|
||||
|
||||
// repeat create with same ID but different size may update the volume
|
||||
req.RequestedCapacityMinBytes = 10
|
||||
expectResp.CapacityBytes = 10
|
||||
resp, err = hvm.Create(ctx, req)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, expectResp, resp)
|
||||
|
||||
// duplicate create with the same vol name but different ID should fail
|
||||
_, err = hvm.Create(ctx, &cstructs.ClientHostVolumeCreateRequest{
|
||||
Name: name,
|
||||
ID: "different-vol-id",
|
||||
PluginID: "test-plugin",
|
||||
})
|
||||
must.ErrorIs(t, err, ErrVolumeNameExists)
|
||||
})
|
||||
|
||||
// despite being a subtest, this needs to run after "create"
|
||||
t.Run("delete", func(t *testing.T) {
|
||||
name := "vol-name"
|
||||
// should be locked from "create" above
|
||||
assertLocked(t, hvm, name)
|
||||
|
||||
// plugin doesn't exist
|
||||
req := &cstructs.ClientHostVolumeDeleteRequest{
|
||||
Name: name,
|
||||
ID: "vol-id",
|
||||
Name: "vol-name",
|
||||
PluginID: "nope",
|
||||
}
|
||||
_, err := hvm.Delete(ctx, req)
|
||||
must.ErrorIs(t, err, ErrPluginNotExists)
|
||||
assertLocked(t, hvm, name)
|
||||
|
||||
// error from plugin
|
||||
req.PluginID = "test-plugin"
|
||||
plug.deleteErr = errors.New("sad delete")
|
||||
_, err = hvm.Delete(ctx, req)
|
||||
must.ErrorIs(t, err, plug.deleteErr)
|
||||
assertLocked(t, hvm, name)
|
||||
plug.reset()
|
||||
|
||||
// error saving state
|
||||
hvm.stateMgr = errDB
|
||||
_, err = hvm.Delete(ctx, req)
|
||||
must.ErrorIs(t, err, cstate.ErrDBError)
|
||||
assertLocked(t, hvm, name)
|
||||
|
||||
// happy path
|
||||
// add stuff that should be deleted
|
||||
@@ -136,6 +165,7 @@ func TestHostVolumeManager(t *testing.T) {
|
||||
stateVols, err := memDB.GetDynamicHostVolumes()
|
||||
must.NoError(t, err)
|
||||
must.Nil(t, stateVols, must.Sprint("vols should be deleted from state"))
|
||||
assertNotLocked(t, hvm, name)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -181,6 +211,16 @@ func (p *fakePlugin) Delete(_ context.Context, req *cstructs.ClientHostVolumeDel
|
||||
return nil
|
||||
}
|
||||
|
||||
func assertLocked(t *testing.T, hvm *HostVolumeManager, name string) {
|
||||
t.Helper()
|
||||
must.True(t, hvm.locker.isLocked(name), must.Sprintf("vol name %q should be locked", name))
|
||||
}
|
||||
|
||||
func assertNotLocked(t *testing.T, hvm *HostVolumeManager, name string) {
|
||||
t.Helper()
|
||||
must.False(t, hvm.locker.isLocked(name), must.Sprintf("vol name %q should not be locked", name))
|
||||
}
|
||||
|
||||
func TestHostVolumeManager_restoreFromState(t *testing.T) {
|
||||
log := testlog.HCLogger(t)
|
||||
vol := &cstructs.HostVolumeState{
|
||||
@@ -191,7 +231,7 @@ func TestHostVolumeManager_restoreFromState(t *testing.T) {
|
||||
PluginID: "mkdir",
|
||||
},
|
||||
}
|
||||
node := newFakeNode()
|
||||
node := newFakeNode(t)
|
||||
|
||||
t.Run("no vols", func(t *testing.T) {
|
||||
state := cstate.NewMemDB(log)
|
||||
@@ -235,6 +275,8 @@ func TestHostVolumeManager_restoreFromState(t *testing.T) {
|
||||
must.Eq(t, expect, vols)
|
||||
|
||||
must.DirExists(t, volPath)
|
||||
|
||||
assertLocked(t, hvm, "test-vol-name")
|
||||
})
|
||||
|
||||
t.Run("state error", func(t *testing.T) {
|
||||
@@ -290,15 +332,17 @@ func TestHostVolumeManager_restoreFromState(t *testing.T) {
|
||||
|
||||
type fakeNode struct {
|
||||
vols VolumeMap
|
||||
log hclog.Logger
|
||||
}
|
||||
|
||||
func (n *fakeNode) updateVol(name string, volume *structs.ClientHostVolumeConfig) {
|
||||
UpdateVolumeMap(n.vols, name, volume)
|
||||
UpdateVolumeMap(n.log, n.vols, name, volume)
|
||||
}
|
||||
|
||||
func newFakeNode() *fakeNode {
|
||||
func newFakeNode(t *testing.T) *fakeNode {
|
||||
return &fakeNode{
|
||||
vols: make(VolumeMap),
|
||||
log: testlog.HCLogger(t),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -6,6 +6,7 @@ package hostvolumemanager
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
@@ -24,7 +25,7 @@ type VolumeMap map[string]*structs.ClientHostVolumeConfig
|
||||
//
|
||||
// Since it may mutate the map, the caller should make a copy
|
||||
// or acquire a lock as appropriate for their context.
|
||||
func UpdateVolumeMap(volumes VolumeMap, name string, vol *structs.ClientHostVolumeConfig) (changed bool) {
|
||||
func UpdateVolumeMap(log hclog.Logger, volumes VolumeMap, name string, vol *structs.ClientHostVolumeConfig) (changed bool) {
|
||||
current, exists := volumes[name]
|
||||
if vol == nil {
|
||||
if exists {
|
||||
@@ -32,6 +33,12 @@ func UpdateVolumeMap(volumes VolumeMap, name string, vol *structs.ClientHostVolu
|
||||
changed = true
|
||||
}
|
||||
} else {
|
||||
// if the volume already exists with no ID, it will be because it was
|
||||
// added to client agent config after having been previously created
|
||||
// as a dynamic vol. dynamic takes precedence, but log a warning.
|
||||
if exists && current.ID == "" {
|
||||
log.Warn("overriding static host volume with dynamic", "name", name, "id", vol.ID)
|
||||
}
|
||||
if !exists || !vol.Equal(current) {
|
||||
volumes[name] = vol
|
||||
changed = true
|
||||
|
||||
@@ -23,6 +23,8 @@ func TestUpdateVolumeMap(t *testing.T) {
|
||||
|
||||
expectMap VolumeMap
|
||||
expectChange bool
|
||||
|
||||
expectLog string
|
||||
}{
|
||||
{
|
||||
name: "delete absent",
|
||||
@@ -57,20 +59,33 @@ func TestUpdateVolumeMap(t *testing.T) {
|
||||
expectChange: false,
|
||||
},
|
||||
{
|
||||
// this should not happen, but test anyway
|
||||
// this should not happen with dynamic vols, but test anyway
|
||||
name: "change present",
|
||||
vols: VolumeMap{"changeme": {Path: "before"}},
|
||||
vols: VolumeMap{"changeme": {ID: "before"}},
|
||||
volName: "changeme",
|
||||
vol: &structs.ClientHostVolumeConfig{Path: "after"},
|
||||
expectMap: VolumeMap{"changeme": {Path: "after"}},
|
||||
vol: &structs.ClientHostVolumeConfig{ID: "after"},
|
||||
expectMap: VolumeMap{"changeme": {ID: "after"}},
|
||||
expectChange: true,
|
||||
},
|
||||
{
|
||||
// this should only happen during agent start, if a static vol has
|
||||
// been added to config after a previous dynamic vol was created
|
||||
// with the same name.
|
||||
name: "override static",
|
||||
vols: VolumeMap{"overrideme": {ID: ""}}, // static vols have no ID
|
||||
volName: "overrideme",
|
||||
vol: &structs.ClientHostVolumeConfig{ID: "dynamic-vol-id"},
|
||||
expectMap: VolumeMap{"overrideme": {ID: "dynamic-vol-id"}},
|
||||
expectChange: true,
|
||||
expectLog: "overriding static host volume with dynamic: name=overrideme id=dynamic-vol-id",
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
log, getLogs := logRecorder(t)
|
||||
|
||||
changed := UpdateVolumeMap(tc.vols, tc.volName, tc.vol)
|
||||
changed := UpdateVolumeMap(log, tc.vols, tc.volName, tc.vol)
|
||||
must.Eq(t, tc.expectMap, tc.vols)
|
||||
|
||||
if tc.expectChange {
|
||||
@@ -79,6 +94,7 @@ func TestUpdateVolumeMap(t *testing.T) {
|
||||
must.False(t, changed, must.Sprint("expect volume not to have been changed"))
|
||||
}
|
||||
|
||||
must.StrContains(t, getLogs(), tc.expectLog)
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -87,7 +103,7 @@ func TestWaitForFirstFingerprint(t *testing.T) {
|
||||
log := testlog.HCLogger(t)
|
||||
tmp := t.TempDir()
|
||||
memDB := state.NewMemDB(log)
|
||||
node := newFakeNode()
|
||||
node := newFakeNode(t)
|
||||
hvm := NewHostVolumeManager(log, Config{
|
||||
PluginDir: "",
|
||||
SharedMountDir: tmp,
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/nomad/client/devicemanager"
|
||||
hvm "github.com/hashicorp/nomad/client/hostvolumemanager"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
|
||||
@@ -50,7 +51,8 @@ SEND_BATCH:
|
||||
// host volume updates
|
||||
var hostVolChanged bool
|
||||
c.batchNodeUpdates.batchHostVolumeUpdates(func(name string, vol *structs.ClientHostVolumeConfig) {
|
||||
hostVolChanged = hvm.UpdateVolumeMap(newConfig.Node.HostVolumes, name, vol)
|
||||
hostVolChanged = hvm.UpdateVolumeMap(c.logger.Named("node_updater").With("method", "batchFirstFingerprint"),
|
||||
newConfig.Node.HostVolumes, name, vol)
|
||||
})
|
||||
|
||||
// csi updates
|
||||
@@ -140,7 +142,8 @@ func (c *Client) updateNodeFromHostVol(name string, vol *structs.ClientHostVolum
|
||||
newConfig.Node.HostVolumes = make(map[string]*structs.ClientHostVolumeConfig)
|
||||
}
|
||||
|
||||
changed := hvm.UpdateVolumeMap(newConfig.Node.HostVolumes, name, vol)
|
||||
changed := hvm.UpdateVolumeMap(c.logger.Named("node_updater").With("method", "updateNodeFromHostVol"),
|
||||
newConfig.Node.HostVolumes, name, vol)
|
||||
if changed {
|
||||
c.config = newConfig
|
||||
c.updateNode()
|
||||
@@ -342,6 +345,8 @@ func (c *Client) updateNodeFromDevicesLocked(devices []*structs.NodeDeviceResour
|
||||
// Once ready, the batches can be flushed and toggled to stop batching and forward
|
||||
// all updates to a configured callback to be performed incrementally
|
||||
type batchNodeUpdates struct {
|
||||
logger hclog.Logger
|
||||
|
||||
// access to driver fields must hold driversMu lock
|
||||
drivers map[string]*structs.DriverInfo
|
||||
driversBatched bool
|
||||
@@ -368,12 +373,14 @@ type batchNodeUpdates struct {
|
||||
}
|
||||
|
||||
func newBatchNodeUpdates(
|
||||
logger hclog.Logger,
|
||||
driverCB drivermanager.UpdateNodeDriverInfoFn,
|
||||
devicesCB devicemanager.UpdateNodeDevicesFn,
|
||||
csiCB csimanager.UpdateNodeCSIInfoFunc,
|
||||
hostVolumeCB hvm.HostVolumeNodeUpdater) *batchNodeUpdates {
|
||||
|
||||
return &batchNodeUpdates{
|
||||
logger: logger,
|
||||
drivers: make(map[string]*structs.DriverInfo),
|
||||
driverCB: driverCB,
|
||||
devices: []*structs.NodeDeviceResource{},
|
||||
@@ -394,7 +401,8 @@ func (b *batchNodeUpdates) updateNodeFromHostVolume(name string, vol *structs.Cl
|
||||
b.hostVolumeCB(name, vol) // => Client.updateNodeFromHostVol()
|
||||
return
|
||||
}
|
||||
hvm.UpdateVolumeMap(b.hostVolumes, name, vol)
|
||||
hvm.UpdateVolumeMap(b.logger.Named("node_updater").With("method", "updateNodeFromHostVolume"),
|
||||
b.hostVolumes, name, vol)
|
||||
}
|
||||
|
||||
// this one runs on client start
|
||||
|
||||
Reference in New Issue
Block a user