mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 18:35:44 +03:00
dynamic host volumes: volume fingerprinting (#24613)
and expand the demo a bit
This commit is contained in:
committed by
Tim Gross
parent
76641c8081
commit
e76f5e0b4c
@@ -517,6 +517,8 @@ type DriverInfo struct {
|
||||
type HostVolumeInfo struct {
|
||||
Path string
|
||||
ReadOnly bool
|
||||
// ID is set for dynamic host volumes only.
|
||||
ID string
|
||||
}
|
||||
|
||||
// HostNetworkInfo is used to return metadata about a given HostNetwork
|
||||
|
||||
@@ -411,6 +411,7 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
|
||||
c.updateNodeFromDriver,
|
||||
c.updateNodeFromDevices,
|
||||
c.updateNodeFromCSI,
|
||||
c.updateNodeFromHostVol,
|
||||
)
|
||||
|
||||
// Initialize the server manager
|
||||
@@ -535,16 +536,14 @@ func NewClient(cfg *config.Config, consulCatalog consul.CatalogAPI, consulProxie
|
||||
c.devicemanager = devManager
|
||||
c.pluginManagers.RegisterAndRun(devManager)
|
||||
|
||||
c.hostVolumeManager, err = hvm.NewHostVolumeManager(logger,
|
||||
c.stateDB, hostVolumeRequestTimeout,
|
||||
cfg.HostVolumePluginDir,
|
||||
cfg.AllocMountsDir)
|
||||
if err != nil {
|
||||
// NewHostVolumeManager will only err if it fails to read state store,
|
||||
// or if one or more required plugins do not exist, so halt the client
|
||||
// because something needs to be fixed by a cluster admin.
|
||||
return nil, err
|
||||
}
|
||||
// set up dynamic host volume manager
|
||||
c.hostVolumeManager = hvm.NewHostVolumeManager(logger, hvm.Config{
|
||||
PluginDir: cfg.HostVolumePluginDir,
|
||||
SharedMountDir: cfg.AllocMountsDir,
|
||||
StateMgr: c.stateDB,
|
||||
UpdateNodeVols: c.batchNodeUpdates.updateNodeFromHostVolume,
|
||||
})
|
||||
c.pluginManagers.RegisterAndRun(c.hostVolumeManager)
|
||||
|
||||
// Set up the service registration wrapper using the Consul and Nomad
|
||||
// implementations. The Nomad implementation is only ever used on the
|
||||
|
||||
@@ -50,7 +50,7 @@ func (v *HostVolume) Delete(
|
||||
ctx, cancelFn := v.requestContext()
|
||||
defer cancelFn()
|
||||
|
||||
_, err := v.c.hostVolumeManager.Delete(ctx, req) // db TODO(1.10.0): cresp is empty... why return it?
|
||||
_, err := v.c.hostVolumeManager.Delete(ctx, req)
|
||||
if err != nil {
|
||||
v.c.logger.Error("failed to delete host volume", "ID", req.ID, "error", err)
|
||||
return err
|
||||
|
||||
@@ -6,13 +6,13 @@ package client
|
||||
import (
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
hvm "github.com/hashicorp/nomad/client/hostvolumemanager"
|
||||
"github.com/hashicorp/nomad/client/state"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
@@ -26,16 +26,22 @@ func TestHostVolume(t *testing.T) {
|
||||
client.stateDB = memdb
|
||||
|
||||
tmp := t.TempDir()
|
||||
var err error
|
||||
manager := hvm.NewHostVolumeManager(testlog.HCLogger(t), hvm.Config{
|
||||
StateMgr: client.stateDB,
|
||||
UpdateNodeVols: client.updateNodeFromHostVol,
|
||||
PluginDir: "/no/ext/plugins",
|
||||
SharedMountDir: tmp,
|
||||
})
|
||||
client.hostVolumeManager = manager
|
||||
expectDir := filepath.Join(tmp, "test-vol-id")
|
||||
client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t),
|
||||
client.stateDB, time.Second, "/no/ext/plugins", tmp)
|
||||
must.NoError(t, err)
|
||||
|
||||
t.Run("happy", func(t *testing.T) {
|
||||
|
||||
/* create */
|
||||
|
||||
req := &cstructs.ClientHostVolumeCreateRequest{
|
||||
ID: "test-vol-id",
|
||||
Name: "test-vol-name",
|
||||
ID: "test-vol-id",
|
||||
PluginID: "mkdir", // real plugin really makes a dir
|
||||
}
|
||||
var resp cstructs.ClientHostVolumeCreateResponse
|
||||
@@ -56,8 +62,19 @@ func TestHostVolume(t *testing.T) {
|
||||
CreateReq: req,
|
||||
}
|
||||
must.Eq(t, expectState, vols[0])
|
||||
// and should be fingerprinted
|
||||
must.Eq(t, hvm.VolumeMap{
|
||||
req.Name: {
|
||||
ID: req.ID,
|
||||
Name: req.Name,
|
||||
Path: expectDir,
|
||||
},
|
||||
}, client.Node().HostVolumes)
|
||||
|
||||
/* delete */
|
||||
|
||||
delReq := &cstructs.ClientHostVolumeDeleteRequest{
|
||||
Name: "test-vol-name",
|
||||
ID: "test-vol-id",
|
||||
PluginID: "mkdir",
|
||||
HostPath: expectDir,
|
||||
@@ -72,6 +89,8 @@ func TestHostVolume(t *testing.T) {
|
||||
vols, err = memdb.GetDynamicHostVolumes()
|
||||
must.NoError(t, err)
|
||||
must.Len(t, 0, vols)
|
||||
// and the fingerprint, too
|
||||
must.Eq(t, map[string]*structs.ClientHostVolumeConfig{}, client.Node().HostVolumes)
|
||||
})
|
||||
|
||||
t.Run("missing plugin", func(t *testing.T) {
|
||||
@@ -92,9 +111,12 @@ func TestHostVolume(t *testing.T) {
|
||||
|
||||
t.Run("error from plugin", func(t *testing.T) {
|
||||
// "mkdir" plugin can't create a directory within a file
|
||||
client.hostVolumeManager, err = hvm.NewHostVolumeManager(testlog.HCLogger(t),
|
||||
client.stateDB, time.Second, "/no/ext/plugins", "host_volume_endpoint_test.go")
|
||||
must.NoError(t, err)
|
||||
client.hostVolumeManager = hvm.NewHostVolumeManager(testlog.HCLogger(t), hvm.Config{
|
||||
StateMgr: client.stateDB,
|
||||
UpdateNodeVols: client.updateNodeFromHostVol,
|
||||
PluginDir: "/no/ext/plugins",
|
||||
SharedMountDir: "host_volume_endpoint_test.go",
|
||||
})
|
||||
|
||||
req := &cstructs.ClientHostVolumeCreateRequest{
|
||||
ID: "test-vol-id",
|
||||
|
||||
@@ -59,6 +59,20 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context,
|
||||
"path", path)
|
||||
log.Debug("running plugin")
|
||||
|
||||
resp := &HostVolumePluginCreateResponse{
|
||||
Path: path,
|
||||
SizeBytes: 0,
|
||||
}
|
||||
|
||||
if _, err := os.Stat(path); err == nil {
|
||||
// already exists
|
||||
return resp, nil
|
||||
} else if !os.IsNotExist(err) {
|
||||
// doesn't exist, but some other path error
|
||||
log.Debug("error with plugin", "error", err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err := os.Mkdir(path, 0o700)
|
||||
if err != nil {
|
||||
log.Debug("error with plugin", "error", err)
|
||||
@@ -66,10 +80,7 @@ func (p *HostVolumePluginMkdir) Create(_ context.Context,
|
||||
}
|
||||
|
||||
log.Debug("plugin ran successfully")
|
||||
return &HostVolumePluginCreateResponse{
|
||||
Path: path,
|
||||
SizeBytes: 0,
|
||||
}, nil
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (p *HostVolumePluginMkdir) Delete(_ context.Context, req *cstructs.ClientHostVolumeDeleteRequest) error {
|
||||
|
||||
@@ -7,12 +7,13 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"path/filepath"
|
||||
"time"
|
||||
"sync"
|
||||
|
||||
"github.com/hashicorp/go-hclog"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/helper"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -26,42 +27,68 @@ type HostVolumeStateManager interface {
|
||||
DeleteDynamicHostVolume(string) error
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
// PluginDir is where external plugins may be found.
|
||||
PluginDir string
|
||||
|
||||
// SharedMountDir is where plugins should place the directory
|
||||
// that will later become a volume HostPath
|
||||
SharedMountDir string
|
||||
|
||||
// StateMgr manages client state to restore on agent restarts.
|
||||
StateMgr HostVolumeStateManager
|
||||
|
||||
// UpdateNodeVols is run to update the node when a volume is created
|
||||
// or deleted.
|
||||
UpdateNodeVols HostVolumeNodeUpdater
|
||||
}
|
||||
|
||||
type HostVolumeManager struct {
|
||||
pluginDir string
|
||||
sharedMountDir string
|
||||
stateMgr HostVolumeStateManager
|
||||
|
||||
log hclog.Logger
|
||||
updateNodeVols HostVolumeNodeUpdater
|
||||
log hclog.Logger
|
||||
}
|
||||
|
||||
func NewHostVolumeManager(logger hclog.Logger,
|
||||
state HostVolumeStateManager, restoreTimeout time.Duration,
|
||||
pluginDir, sharedMountDir string) (*HostVolumeManager, error) {
|
||||
|
||||
log := logger.Named("host_volume_mgr")
|
||||
|
||||
// db TODO(1.10.0): how do we define the external mounter plugins? plugin configs?
|
||||
hvm := &HostVolumeManager{
|
||||
pluginDir: pluginDir,
|
||||
sharedMountDir: sharedMountDir,
|
||||
stateMgr: state,
|
||||
log: log,
|
||||
func NewHostVolumeManager(logger hclog.Logger, config Config) *HostVolumeManager {
|
||||
// db TODO(1.10.0): document plugin config options
|
||||
return &HostVolumeManager{
|
||||
pluginDir: config.PluginDir,
|
||||
sharedMountDir: config.SharedMountDir,
|
||||
stateMgr: config.StateMgr,
|
||||
updateNodeVols: config.UpdateNodeVols,
|
||||
log: logger.Named("host_volume_manager"),
|
||||
}
|
||||
}
|
||||
|
||||
if err := hvm.restoreState(state, restoreTimeout); err != nil {
|
||||
func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, resp *HostVolumePluginCreateResponse) *structs.ClientHostVolumeConfig {
|
||||
if req == nil || resp == nil {
|
||||
return nil
|
||||
}
|
||||
return &structs.ClientHostVolumeConfig{
|
||||
Name: req.Name,
|
||||
ID: req.ID,
|
||||
Path: resp.Path,
|
||||
|
||||
// dynamic volumes, like CSI, have more robust `capabilities`,
|
||||
// so we always set ReadOnly to false, and let the scheduler
|
||||
// decide when to ignore this and check capabilities instead.
|
||||
ReadOnly: false,
|
||||
}
|
||||
}
|
||||
|
||||
func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap, error) {
|
||||
vols, err := hvm.stateMgr.GetDynamicHostVolumes()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return hvm, nil
|
||||
}
|
||||
volumes := make(VolumeMap)
|
||||
var mut sync.Mutex
|
||||
|
||||
func (hvm *HostVolumeManager) restoreState(state HostVolumeStateManager, timeout time.Duration) error {
|
||||
vols, err := state.GetDynamicHostVolumes()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if len(vols) == 0 {
|
||||
return nil // nothing to do
|
||||
return volumes, nil // nothing to do
|
||||
}
|
||||
|
||||
// re-"create" the volumes - plugins have the best knowledge of their
|
||||
@@ -76,17 +103,20 @@ func (hvm *HostVolumeManager) restoreState(state HostVolumeStateManager, timeout
|
||||
return err
|
||||
}
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
||||
defer cancel()
|
||||
if _, err := plug.Create(ctx, vol.CreateReq); err != nil {
|
||||
resp, err := plug.Create(ctx, vol.CreateReq)
|
||||
if err != nil {
|
||||
// plugin execution errors are only logged
|
||||
hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err)
|
||||
return nil
|
||||
}
|
||||
mut.Lock()
|
||||
volumes[vol.CreateReq.Name] = genVolConfig(vol.CreateReq, resp)
|
||||
mut.Unlock()
|
||||
return nil
|
||||
})
|
||||
}
|
||||
mErr := group.Wait()
|
||||
return helper.FlattenMultierror(mErr.ErrorOrNil())
|
||||
return volumes, helper.FlattenMultierror(mErr.ErrorOrNil())
|
||||
}
|
||||
|
||||
func (hvm *HostVolumeManager) getPlugin(id string) (HostVolumePlugin, error) {
|
||||
@@ -139,9 +169,11 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
|
||||
return nil, helper.FlattenMultierror(err)
|
||||
}
|
||||
|
||||
// db TODO(1.10.0): now we need to add the volume to the node fingerprint!
|
||||
hvm.updateNodeVols(req.Name, genVolConfig(req, pluginResp))
|
||||
|
||||
resp := &cstructs.ClientHostVolumeCreateResponse{
|
||||
VolumeName: req.Name,
|
||||
VolumeID: req.ID,
|
||||
HostPath: pluginResp.Path,
|
||||
CapacityBytes: pluginResp.SizeBytes,
|
||||
}
|
||||
@@ -162,12 +194,17 @@ func (hvm *HostVolumeManager) Delete(ctx context.Context,
|
||||
return nil, err
|
||||
}
|
||||
|
||||
resp := &cstructs.ClientHostVolumeDeleteResponse{}
|
||||
|
||||
if err := hvm.stateMgr.DeleteDynamicHostVolume(req.ID); err != nil {
|
||||
hvm.log.Error("failed to delete volume in state", "volume_id", req.ID, "error", err)
|
||||
return nil, err // bail so a user may retry
|
||||
}
|
||||
|
||||
hvm.updateNodeVols(req.Name, nil)
|
||||
|
||||
resp := &cstructs.ClientHostVolumeDeleteResponse{
|
||||
VolumeName: req.Name,
|
||||
VolumeID: req.ID,
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@
|
||||
package hostvolumemanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
@@ -11,6 +12,7 @@ import (
|
||||
cstate "github.com/hashicorp/nomad/client/state"
|
||||
cstructs "github.com/hashicorp/nomad/client/structs"
|
||||
"github.com/hashicorp/nomad/helper/testlog"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
@@ -21,10 +23,12 @@ func TestNewHostVolumeManager_restoreState(t *testing.T) {
|
||||
vol := &cstructs.HostVolumeState{
|
||||
ID: "test-vol-id",
|
||||
CreateReq: &cstructs.ClientHostVolumeCreateRequest{
|
||||
Name: "test-vol-name",
|
||||
ID: "test-vol-id",
|
||||
PluginID: "mkdir",
|
||||
},
|
||||
}
|
||||
fNode := newFakeNode()
|
||||
|
||||
t.Run("happy", func(t *testing.T) {
|
||||
// put our volume in state
|
||||
@@ -34,20 +38,62 @@ func TestNewHostVolumeManager_restoreState(t *testing.T) {
|
||||
// new volume manager should load it from state and run Create,
|
||||
// resulting in a volume directory in this mountDir.
|
||||
mountDir := t.TempDir()
|
||||
volPath := filepath.Join(mountDir, vol.ID)
|
||||
|
||||
_, err := NewHostVolumeManager(log, state, time.Second, "/wherever", mountDir)
|
||||
hvm := NewHostVolumeManager(log, Config{
|
||||
StateMgr: state,
|
||||
UpdateNodeVols: fNode.updateVol,
|
||||
PluginDir: "/wherever",
|
||||
SharedMountDir: mountDir,
|
||||
})
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
vols, err := hvm.restoreFromState(ctx)
|
||||
must.NoError(t, err)
|
||||
|
||||
volPath := filepath.Join(mountDir, vol.ID)
|
||||
expect := map[string]*structs.ClientHostVolumeConfig{
|
||||
"test-vol-name": {
|
||||
Name: "test-vol-name",
|
||||
ID: "test-vol-id",
|
||||
Path: volPath,
|
||||
ReadOnly: false,
|
||||
},
|
||||
}
|
||||
must.Eq(t, expect, vols)
|
||||
|
||||
must.DirExists(t, volPath)
|
||||
})
|
||||
|
||||
t.Run("get error", func(t *testing.T) {
|
||||
state := &cstate.ErrDB{}
|
||||
_, err := NewHostVolumeManager(log, state, time.Second, "/wherever", "/wherever")
|
||||
hvm := NewHostVolumeManager(log, Config{
|
||||
StateMgr: state,
|
||||
UpdateNodeVols: fNode.updateVol,
|
||||
PluginDir: "/wherever",
|
||||
SharedMountDir: "/wherever",
|
||||
})
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
defer cancel()
|
||||
vols, err := hvm.restoreFromState(ctx)
|
||||
// error loading state should break the world
|
||||
must.ErrorIs(t, err, cstate.ErrDBError)
|
||||
must.Nil(t, vols)
|
||||
})
|
||||
|
||||
// db TODO: test plugin error
|
||||
}
|
||||
|
||||
type fakeNode struct {
|
||||
vols VolumeMap
|
||||
}
|
||||
|
||||
func (n *fakeNode) updateVol(name string, volume *structs.ClientHostVolumeConfig) {
|
||||
UpdateVolumeMap(n.vols, name, volume)
|
||||
}
|
||||
|
||||
func newFakeNode() *fakeNode {
|
||||
return &fakeNode{
|
||||
vols: make(VolumeMap),
|
||||
}
|
||||
}
|
||||
|
||||
65
client/hostvolumemanager/volume_fingerprint.go
Normal file
65
client/hostvolumemanager/volume_fingerprint.go
Normal file
@@ -0,0 +1,65 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package hostvolumemanager
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
// this file is for fingerprinting *volumes*
|
||||
// *plugins* are detected in client/fingerprint/dynamic_host_volumes.go
|
||||
|
||||
// HostVolumeNodeUpdater is used to add or remove volumes from the Node.
|
||||
type HostVolumeNodeUpdater func(name string, volume *structs.ClientHostVolumeConfig)
|
||||
|
||||
// VolumeMap keys are volume `name`s, identical to Node.HostVolumes.
|
||||
type VolumeMap map[string]*structs.ClientHostVolumeConfig
|
||||
|
||||
// UpdateVolumeMap returns true if it changes the provided `volumes` map.
|
||||
// If `vol` is nil, key `name` will be removed from the map, if present.
|
||||
// If it is not nil, `name: vol` will be set on the map, if different.
|
||||
//
|
||||
// Since it may mutate the map, the caller should make a copy
|
||||
// or acquire a lock as appropriate for their context.
|
||||
func UpdateVolumeMap(volumes VolumeMap, name string, vol *structs.ClientHostVolumeConfig) (changed bool) {
|
||||
current, exists := volumes[name]
|
||||
if vol == nil {
|
||||
if exists {
|
||||
delete(volumes, name)
|
||||
changed = true
|
||||
}
|
||||
} else {
|
||||
if !exists || !vol.Equal(current) {
|
||||
volumes[name] = vol
|
||||
changed = true
|
||||
}
|
||||
}
|
||||
return changed
|
||||
}
|
||||
|
||||
// WaitForFirstFingerprint implements client.FingerprintingPluginManager
|
||||
func (hvm *HostVolumeManager) WaitForFirstFingerprint(ctx context.Context) <-chan struct{} {
|
||||
// the fingerprint manager puts batchFirstFingerprintsTimeout (50 seconds)
|
||||
// on the context that it sends to us here so we don't need another
|
||||
// timeout. we just need to cancel to report when we are done.
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
volumes, err := hvm.restoreFromState(ctx)
|
||||
if err != nil {
|
||||
hvm.log.Error("failed to restore state", "error", err)
|
||||
return ctx.Done()
|
||||
}
|
||||
for name, vol := range volumes {
|
||||
hvm.updateNodeVols(name, vol) // => batchNodeUpdates.updateNodeFromHostVolume()
|
||||
}
|
||||
return ctx.Done()
|
||||
}
|
||||
func (hvm *HostVolumeManager) Run() {}
|
||||
func (hvm *HostVolumeManager) Shutdown() {}
|
||||
func (hvm *HostVolumeManager) PluginType() string {
|
||||
// "Plugin"Type is misleading, because this is for *volumes* but ok.
|
||||
return "dynamic_host_volume"
|
||||
}
|
||||
81
client/hostvolumemanager/volume_fingerprint_test.go
Normal file
81
client/hostvolumemanager/volume_fingerprint_test.go
Normal file
@@ -0,0 +1,81 @@
|
||||
// Copyright (c) HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
package hostvolumemanager
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/shoenig/test/must"
|
||||
)
|
||||
|
||||
func TestUpdateVolumeMap(t *testing.T) {
|
||||
cases := []struct {
|
||||
name string
|
||||
|
||||
vols VolumeMap
|
||||
volName string
|
||||
vol *structs.ClientHostVolumeConfig
|
||||
|
||||
expectMap VolumeMap
|
||||
expectChange bool
|
||||
}{
|
||||
{
|
||||
name: "delete absent",
|
||||
vols: VolumeMap{},
|
||||
volName: "anything",
|
||||
vol: nil,
|
||||
expectMap: VolumeMap{},
|
||||
expectChange: false,
|
||||
},
|
||||
{
|
||||
name: "delete present",
|
||||
vols: VolumeMap{"deleteme": {}},
|
||||
volName: "deleteme",
|
||||
vol: nil,
|
||||
expectMap: VolumeMap{},
|
||||
expectChange: true,
|
||||
},
|
||||
{
|
||||
name: "add absent",
|
||||
vols: VolumeMap{},
|
||||
volName: "addme",
|
||||
vol: &structs.ClientHostVolumeConfig{},
|
||||
expectMap: VolumeMap{"addme": {}},
|
||||
expectChange: true,
|
||||
},
|
||||
{
|
||||
name: "add present",
|
||||
vols: VolumeMap{"ignoreme": {}},
|
||||
volName: "ignoreme",
|
||||
vol: &structs.ClientHostVolumeConfig{},
|
||||
expectMap: VolumeMap{"ignoreme": {}},
|
||||
expectChange: false,
|
||||
},
|
||||
{
|
||||
// this should not happen, but test anyway
|
||||
name: "change present",
|
||||
vols: VolumeMap{"changeme": {Path: "before"}},
|
||||
volName: "changeme",
|
||||
vol: &structs.ClientHostVolumeConfig{Path: "after"},
|
||||
expectMap: VolumeMap{"changeme": {Path: "after"}},
|
||||
expectChange: true,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
|
||||
changed := UpdateVolumeMap(tc.vols, tc.volName, tc.vol)
|
||||
must.Eq(t, tc.expectMap, tc.vols)
|
||||
|
||||
if tc.expectChange {
|
||||
must.True(t, changed, must.Sprint("expect volume to have been changed"))
|
||||
} else {
|
||||
must.False(t, changed, must.Sprint("expect volume not to have been changed"))
|
||||
}
|
||||
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/nomad/client/devicemanager"
|
||||
hvm "github.com/hashicorp/nomad/client/hostvolumemanager"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
|
||||
"github.com/hashicorp/nomad/client/pluginmanager/drivermanager"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
@@ -30,7 +31,7 @@ func (c *Client) batchFirstFingerprints() {
|
||||
|
||||
ch, err := c.pluginManagers.WaitForFirstFingerprint(ctx)
|
||||
if err != nil {
|
||||
c.logger.Warn("failed to batch initial fingerprint updates, switching to incemental updates")
|
||||
c.logger.Warn("failed to batch initial fingerprint updates, switching to incremental updates")
|
||||
goto SEND_BATCH
|
||||
}
|
||||
|
||||
@@ -46,6 +47,12 @@ SEND_BATCH:
|
||||
|
||||
newConfig := c.config.Copy()
|
||||
|
||||
// host volume updates
|
||||
var hostVolChanged bool
|
||||
c.batchNodeUpdates.batchHostVolumeUpdates(func(name string, vol *structs.ClientHostVolumeConfig) {
|
||||
hostVolChanged = hvm.UpdateVolumeMap(newConfig.Node.HostVolumes, name, vol)
|
||||
})
|
||||
|
||||
// csi updates
|
||||
var csiChanged bool
|
||||
c.batchNodeUpdates.batchCSIUpdates(func(name string, info *structs.CSIInfo) {
|
||||
@@ -85,7 +92,7 @@ SEND_BATCH:
|
||||
})
|
||||
|
||||
// only update the node if changes occurred
|
||||
if driverChanged || devicesChanged || csiChanged {
|
||||
if driverChanged || devicesChanged || csiChanged || hostVolChanged {
|
||||
c.config = newConfig
|
||||
c.updateNode()
|
||||
}
|
||||
@@ -123,6 +130,23 @@ func (c *Client) updateNodeFromCSI(name string, info *structs.CSIInfo) {
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) updateNodeFromHostVol(name string, vol *structs.ClientHostVolumeConfig) {
|
||||
c.configLock.Lock()
|
||||
defer c.configLock.Unlock()
|
||||
|
||||
newConfig := c.config.Copy()
|
||||
|
||||
if newConfig.Node.HostVolumes == nil {
|
||||
newConfig.Node.HostVolumes = make(map[string]*structs.ClientHostVolumeConfig)
|
||||
}
|
||||
|
||||
changed := hvm.UpdateVolumeMap(newConfig.Node.HostVolumes, name, vol)
|
||||
if changed {
|
||||
c.config = newConfig
|
||||
c.updateNode()
|
||||
}
|
||||
}
|
||||
|
||||
// updateNodeFromCSIControllerLocked makes the changes to the node from a csi
|
||||
// update but does not send the update to the server. c.configLock must be held
|
||||
// before calling this func.
|
||||
@@ -336,12 +360,18 @@ type batchNodeUpdates struct {
|
||||
csiBatched bool
|
||||
csiCB csimanager.UpdateNodeCSIInfoFunc
|
||||
csiMu sync.Mutex
|
||||
|
||||
hostVolumes hvm.VolumeMap
|
||||
hostVolumesBatched bool
|
||||
hostVolumeCB hvm.HostVolumeNodeUpdater
|
||||
hostVolumeMu sync.Mutex
|
||||
}
|
||||
|
||||
func newBatchNodeUpdates(
|
||||
driverCB drivermanager.UpdateNodeDriverInfoFn,
|
||||
devicesCB devicemanager.UpdateNodeDevicesFn,
|
||||
csiCB csimanager.UpdateNodeCSIInfoFunc) *batchNodeUpdates {
|
||||
csiCB csimanager.UpdateNodeCSIInfoFunc,
|
||||
hostVolumeCB hvm.HostVolumeNodeUpdater) *batchNodeUpdates {
|
||||
|
||||
return &batchNodeUpdates{
|
||||
drivers: make(map[string]*structs.DriverInfo),
|
||||
@@ -351,9 +381,36 @@ func newBatchNodeUpdates(
|
||||
csiNodePlugins: make(map[string]*structs.CSIInfo),
|
||||
csiControllerPlugins: make(map[string]*structs.CSIInfo),
|
||||
csiCB: csiCB,
|
||||
hostVolumes: make(hvm.VolumeMap),
|
||||
hostVolumeCB: hostVolumeCB,
|
||||
}
|
||||
}
|
||||
|
||||
// this is the one that the volume manager runs
|
||||
func (b *batchNodeUpdates) updateNodeFromHostVolume(name string, vol *structs.ClientHostVolumeConfig) {
|
||||
b.hostVolumeMu.Lock()
|
||||
defer b.hostVolumeMu.Unlock()
|
||||
if b.hostVolumesBatched {
|
||||
b.hostVolumeCB(name, vol) // => Client.updateNodeFromHostVol()
|
||||
return
|
||||
}
|
||||
hvm.UpdateVolumeMap(b.hostVolumes, name, vol)
|
||||
}
|
||||
|
||||
// this one runs on client start
|
||||
func (b *batchNodeUpdates) batchHostVolumeUpdates(f hvm.HostVolumeNodeUpdater) error {
|
||||
b.hostVolumeMu.Lock()
|
||||
defer b.hostVolumeMu.Unlock()
|
||||
if b.hostVolumesBatched {
|
||||
return fmt.Errorf("host volume updates already batched")
|
||||
}
|
||||
b.hostVolumesBatched = true
|
||||
for name, vol := range b.hostVolumes {
|
||||
f(name, vol) // => c.batchNodeUpdates.batchHostVolumeUpdates(FUNC
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateNodeFromCSI implements csimanager.UpdateNodeCSIInfoFunc and is used in
|
||||
// the csi manager to send csi fingerprints to the server.
|
||||
func (b *batchNodeUpdates) updateNodeFromCSI(plugin string, info *structs.CSIInfo) {
|
||||
|
||||
@@ -37,6 +37,9 @@ type ClientHostVolumeCreateRequest struct {
|
||||
}
|
||||
|
||||
type ClientHostVolumeCreateResponse struct {
|
||||
VolumeName string
|
||||
VolumeID string
|
||||
|
||||
// Path is the host path where the volume's mount point was created. We send
|
||||
// this back to the server to make debugging easier.
|
||||
HostPath string
|
||||
@@ -50,6 +53,8 @@ type ClientHostVolumeDeleteRequest struct {
|
||||
// ID is a UUID-like string generated by the server.
|
||||
ID string
|
||||
|
||||
Name string
|
||||
|
||||
// PluginID is the name of the host volume plugin on the client that will be
|
||||
// used for deleting the volume. If omitted, the client will use its default
|
||||
// built-in plugin.
|
||||
@@ -68,4 +73,7 @@ type ClientHostVolumeDeleteRequest struct {
|
||||
Parameters map[string]string
|
||||
}
|
||||
|
||||
type ClientHostVolumeDeleteResponse struct{}
|
||||
type ClientHostVolumeDeleteResponse struct {
|
||||
VolumeName string
|
||||
VolumeID string
|
||||
}
|
||||
|
||||
14
demo/hostvolume/check.sh
Executable file
14
demo/hostvolume/check.sh
Executable file
@@ -0,0 +1,14 @@
|
||||
#!/usr/bin/env bash
|
||||
# Copyright (c) HashiCorp, Inc.
|
||||
# SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
set -xeuo pipefail
|
||||
|
||||
nomad volume status -type=host -verbose
|
||||
nomad operator api /v1/nodes | jq '.[].HostVolumes'
|
||||
|
||||
addr="$(nomad service info -json job | jq -r '.[0].Address'):8000"
|
||||
curl -sS "$addr/external/" | grep hi
|
||||
curl -sS "$addr/internal/" | grep hi
|
||||
|
||||
echo '💚 looks good! 💚'
|
||||
9
demo/hostvolume/e2e.sh
Executable file
9
demo/hostvolume/e2e.sh
Executable file
@@ -0,0 +1,9 @@
|
||||
#!/usr/bin/env bash
|
||||
# Copyright (c) HashiCorp, Inc.
|
||||
# SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
set -xeuo pipefail
|
||||
|
||||
./setup.sh
|
||||
./check.sh
|
||||
./teardown.sh
|
||||
22
demo/hostvolume/external-plugin.volume.hcl
Normal file
22
demo/hostvolume/external-plugin.volume.hcl
Normal file
@@ -0,0 +1,22 @@
|
||||
# Copyright (c) HashiCorp, Inc.
|
||||
# SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
name = "external-plugin"
|
||||
type = "host"
|
||||
# the executable named `example-plugin-mkfs` must be placed in the
|
||||
# -host-volume-plugin-dir (config: client.host_volume_plugin_dir)
|
||||
# or you will get an error creating the volume:
|
||||
# * could not place volume "external-plugin": no node meets constraints
|
||||
# The default location is <data-dir>/host_volume_plugins
|
||||
plugin_id = "example-plugin-mkfs"
|
||||
capacity_min = "50mb"
|
||||
capacity_max = "50mb"
|
||||
|
||||
capability {
|
||||
access_mode = "single-node-writer"
|
||||
attachment_mode = "file-system"
|
||||
}
|
||||
|
||||
parameters {
|
||||
a = "ayy"
|
||||
}
|
||||
@@ -1,19 +0,0 @@
|
||||
# Copyright (c) HashiCorp, Inc.
|
||||
# SPDX-License-Identifier: BUSL-1.1
|
||||
name = "test"
|
||||
type = "host"
|
||||
plugin_id = "example-host-volume"
|
||||
capacity_min = "50mb"
|
||||
capacity_max = "50mb"
|
||||
|
||||
capability {
|
||||
access_mode = "single-node-writer"
|
||||
attachment_mode = "file-system"
|
||||
}
|
||||
|
||||
parameters {
|
||||
a = "ayy"
|
||||
}
|
||||
|
||||
# TODO(1.10.0): don't require node_pool
|
||||
node_pool = "default"
|
||||
14
demo/hostvolume/internal-plugin.volume.hcl
Normal file
14
demo/hostvolume/internal-plugin.volume.hcl
Normal file
@@ -0,0 +1,14 @@
|
||||
# Copyright (c) HashiCorp, Inc.
|
||||
# SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
name = "internal-plugin"
|
||||
type = "host"
|
||||
# this plugin is built into Nomad;
|
||||
# it simply creates a directory.
|
||||
plugin_id = "mkdir"
|
||||
|
||||
capability {
|
||||
access_mode = "single-node-writer"
|
||||
attachment_mode = "file-system"
|
||||
}
|
||||
|
||||
48
demo/hostvolume/job.nomad.hcl
Normal file
48
demo/hostvolume/job.nomad.hcl
Normal file
@@ -0,0 +1,48 @@
|
||||
# Copyright (c) HashiCorp, Inc.
|
||||
# SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
job "job" {
|
||||
group "g" {
|
||||
task "t" {
|
||||
driver = "docker"
|
||||
config {
|
||||
image = "python:slim"
|
||||
command = "bash"
|
||||
args = ["-xc", <<-EOF
|
||||
for dir in internal external; do
|
||||
touch ${NOMAD_TASK_DIR}/$dir/hiii
|
||||
done
|
||||
python -m http.server -d ${NOMAD_TASK_DIR} --bind=::
|
||||
EOF
|
||||
]
|
||||
ports = ["http"]
|
||||
}
|
||||
volume_mount {
|
||||
volume = "int"
|
||||
destination = "${NOMAD_TASK_DIR}/internal"
|
||||
}
|
||||
volume_mount {
|
||||
volume = "ext"
|
||||
destination = "${NOMAD_TASK_DIR}/external"
|
||||
}
|
||||
}
|
||||
volume "int" {
|
||||
type = "host"
|
||||
source = "internal-plugin"
|
||||
}
|
||||
volume "ext" {
|
||||
type = "host"
|
||||
source = "external-plugin"
|
||||
}
|
||||
network {
|
||||
port "http" {
|
||||
static = 8000
|
||||
}
|
||||
}
|
||||
service {
|
||||
name = "job"
|
||||
port = "http"
|
||||
provider = "nomad"
|
||||
}
|
||||
}
|
||||
}
|
||||
14
demo/hostvolume/setup.sh
Executable file
14
demo/hostvolume/setup.sh
Executable file
@@ -0,0 +1,14 @@
|
||||
#!/usr/bin/env bash
|
||||
# Copyright (c) HashiCorp, Inc.
|
||||
# SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
set -xeuo pipefail
|
||||
|
||||
nomad volume create external-plugin.volume.hcl
|
||||
nomad volume create internal-plugin.volume.hcl
|
||||
|
||||
nomad job run job.nomad.hcl
|
||||
|
||||
nomad volume status -type=host -verbose
|
||||
nomad operator api /v1/nodes | jq '.[].HostVolumes'
|
||||
|
||||
17
demo/hostvolume/teardown.sh
Executable file
17
demo/hostvolume/teardown.sh
Executable file
@@ -0,0 +1,17 @@
|
||||
#!/usr/bin/env bash
|
||||
# Copyright (c) HashiCorp, Inc.
|
||||
# SPDX-License-Identifier: BUSL-1.1
|
||||
|
||||
set -xeuo pipefail
|
||||
|
||||
nomad job stop job || true
|
||||
|
||||
for _ in {1..5}; do
|
||||
sleep 3
|
||||
ids="$(nomad volume status -type=host -verbose | awk '/ternal-plugin/ {print$1}')"
|
||||
test -z "$ids" && break
|
||||
for id in $ids; do
|
||||
nomad volume delete -type=host "$id" || continue
|
||||
done
|
||||
done
|
||||
|
||||
@@ -610,6 +610,7 @@ func (v *HostVolume) deleteVolume(vol *structs.HostVolume) error {
|
||||
method := "ClientHostVolume.Delete"
|
||||
cReq := &cstructs.ClientHostVolumeDeleteRequest{
|
||||
ID: vol.ID,
|
||||
Name: vol.Name,
|
||||
PluginID: vol.PluginID,
|
||||
NodeID: vol.NodeID,
|
||||
HostPath: vol.HostPath,
|
||||
|
||||
@@ -31,6 +31,18 @@ type ClientHostVolumeConfig struct {
|
||||
Name string `hcl:",key"`
|
||||
Path string `hcl:"path"`
|
||||
ReadOnly bool `hcl:"read_only"`
|
||||
// ID is set for dynamic host volumes only.
|
||||
ID string `hcl:"-"`
|
||||
}
|
||||
|
||||
func (p *ClientHostVolumeConfig) Equal(o *ClientHostVolumeConfig) bool {
|
||||
if p == nil && o == nil {
|
||||
return true
|
||||
}
|
||||
if p == nil || o == nil {
|
||||
return false
|
||||
}
|
||||
return *p == *o
|
||||
}
|
||||
|
||||
func (p *ClientHostVolumeConfig) Copy() *ClientHostVolumeConfig {
|
||||
|
||||
Reference in New Issue
Block a user