dynamic host volumes: send register to client for fingerprint (#24802)

When we register a volume without a plugin, we need to send a client RPC so that
the node fingerprint can be updated. The registered volume also needs to be
written to client state so that we can restore the fingerprint after a restart.

Co-authored-by: Daniel Bennett <dbennett@hashicorp.com>
This commit is contained in:
Tim Gross
2025-01-08 16:58:58 -05:00
committed by GitHub
parent 2bfe817721
commit 4a65b21aab
16 changed files with 498 additions and 151 deletions

View File

@@ -48,7 +48,7 @@ type HostVolume struct {
// updated on the client
RequestedCapacityMinBytes int64 `mapstructure:"capacity_min" hcl:"capacity_min"`
RequestedCapacityMaxBytes int64 `mapstructure:"capacity_max" hcl:"capacity_max"`
CapacityBytes int64
CapacityBytes int64 `mapstructure:"capacity" hcl:"capacity"`
// RequestedCapabilities defines the options available to group.volume
// blocks. The scheduler checks against the listed capability blocks and

View File

@@ -43,6 +43,24 @@ func (v *HostVolume) Create(
return nil
}
func (v *HostVolume) Register(
req *cstructs.ClientHostVolumeRegisterRequest,
resp *cstructs.ClientHostVolumeRegisterResponse) error {
defer metrics.MeasureSince([]string{"client", "host_volume", "register"}, time.Now())
ctx, cancelFn := v.requestContext()
defer cancelFn()
err := v.c.hostVolumeManager.Register(ctx, req)
if err != nil {
v.c.logger.Error("failed to register host volume", "name", req.Name, "error", err)
return err
}
v.c.logger.Info("registered host volume", "id", req.ID, "path", req.HostPath)
return nil
}
func (v *HostVolume) Delete(
req *cstructs.ClientHostVolumeDeleteRequest,
resp *cstructs.ClientHostVolumeDeleteResponse) error {

View File

@@ -5,6 +5,7 @@ package client
import (
"path/filepath"
"sort"
"testing"
"github.com/hashicorp/nomad/ci"
@@ -12,7 +13,7 @@ import (
"github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/shoenig/test/must"
)
@@ -33,26 +34,27 @@ func TestHostVolume(t *testing.T) {
SharedMountDir: tmp,
})
client.hostVolumeManager = manager
expectDir := filepath.Join(tmp, "test-vol-id")
hostPathCreate := filepath.Join(tmp, "test-vol-id-1")
hostPathRegister := t.TempDir()
t.Run("happy", func(t *testing.T) {
/* create */
req := &cstructs.ClientHostVolumeCreateRequest{
Name: "test-vol-name",
ID: "test-vol-id",
Name: "created-volume",
ID: "test-vol-id-1",
PluginID: "mkdir", // real plugin really makes a dir
}
var resp cstructs.ClientHostVolumeCreateResponse
err := client.ClientRPC("HostVolume.Create", req, &resp)
must.NoError(t, err)
must.Eq(t, cstructs.ClientHostVolumeCreateResponse{
HostPath: expectDir,
HostPath: hostPathCreate,
CapacityBytes: 0, // "mkdir" always returns zero
}, resp)
// technically this is testing "mkdir" more than the RPC
must.DirExists(t, expectDir)
must.DirExists(t, hostPathCreate)
// ensure we saved to client state
vols, err := memdb.GetDynamicHostVolumes()
must.NoError(t, err)
@@ -60,6 +62,7 @@ func TestHostVolume(t *testing.T) {
expectState := &cstructs.HostVolumeState{
ID: req.ID,
CreateReq: req,
HostPath: hostPathCreate,
}
must.Eq(t, expectState, vols[0])
// and should be fingerprinted
@@ -67,30 +70,89 @@ func TestHostVolume(t *testing.T) {
req.Name: {
ID: req.ID,
Name: req.Name,
Path: expectDir,
Path: hostPathCreate,
},
}, client.Node().HostVolumes)
/* register */
regReq := &cstructs.ClientHostVolumeRegisterRequest{
ID: "test-vol-id-2",
Name: "registered-volume",
NodeID: uuid.Generate(),
HostPath: hostPathRegister,
CapacityBytes: 1000,
}
var regResp cstructs.ClientHostVolumeRegisterResponse
err = client.ClientRPC("HostVolume.Register", regReq, &regResp)
must.NoError(t, err)
// ensure we saved to client state
vols, err = memdb.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 2, vols)
sort.Slice(vols, func(i, j int) bool { return vols[i].ID < vols[j].ID })
expectState = &cstructs.HostVolumeState{
ID: regReq.ID,
HostPath: hostPathRegister,
CreateReq: &cstructs.ClientHostVolumeCreateRequest{
ID: regReq.ID,
Name: regReq.Name,
NodeID: regReq.NodeID,
},
}
must.Eq(t, expectState, vols[1])
// and should be fingerprinted
must.Eq(t, hvm.VolumeMap{
req.Name: {
ID: req.ID,
Name: req.Name,
Path: hostPathCreate,
},
regReq.Name: {
ID: regReq.ID,
Name: regReq.Name,
Path: hostPathRegister,
},
}, client.Node().HostVolumes)
/* delete */
delReq := &cstructs.ClientHostVolumeDeleteRequest{
Name: "test-vol-name",
ID: "test-vol-id",
Name: "created-volume",
ID: "test-vol-id-1",
PluginID: "mkdir",
HostPath: expectDir,
HostPath: hostPathCreate,
}
var delResp cstructs.ClientHostVolumeDeleteResponse
err = client.ClientRPC("HostVolume.Delete", delReq, &delResp)
must.NoError(t, err)
must.NotNil(t, delResp)
// again, actually testing the "mkdir" plugin
must.DirNotExists(t, expectDir)
must.DirNotExists(t, hostPathCreate)
// client state should be deleted
vols, err = memdb.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 0, vols)
must.Len(t, 1, vols)
// and the fingerprint, too
must.Eq(t, map[string]*structs.ClientHostVolumeConfig{}, client.Node().HostVolumes)
must.Eq(t, hvm.VolumeMap{
regReq.Name: {
ID: regReq.ID,
Name: regReq.Name,
Path: hostPathRegister,
},
}, client.Node().HostVolumes)
delReq.Name = "registered-volume"
delReq.ID = "test-vol-id-2"
err = client.ClientRPC("HostVolume.Delete", delReq, &delResp)
must.NoError(t, err)
must.NotNil(t, delResp)
vols, err = memdb.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 0, vols)
must.Eq(t, hvm.VolumeMap{}, client.Node().HostVolumes)
})
t.Run("missing plugin", func(t *testing.T) {
@@ -119,20 +181,20 @@ func TestHostVolume(t *testing.T) {
})
req := &cstructs.ClientHostVolumeCreateRequest{
ID: "test-vol-id",
Name: "test-vol-name",
ID: "test-vol-id-1",
Name: "created-volume",
PluginID: "mkdir",
}
var resp cstructs.ClientHostVolumeCreateResponse
err := client.ClientRPC("HostVolume.Create", req, &resp)
must.ErrorContains(t, err, "host_volume_endpoint_test.go/test-vol-id: not a directory")
must.ErrorContains(t, err, "host_volume_endpoint_test.go/test-vol-id-1: not a directory")
delReq := &cstructs.ClientHostVolumeDeleteRequest{
ID: "test-vol-id",
ID: "test-vol-id-1",
PluginID: "mkdir",
}
var delResp cstructs.ClientHostVolumeDeleteResponse
err = client.ClientRPC("HostVolume.Delete", delReq, &delResp)
must.ErrorContains(t, err, "host_volume_endpoint_test.go/test-vol-id: not a directory")
must.ErrorContains(t, err, "host_volume_endpoint_test.go/test-vol-id-1: not a directory")
})
}

View File

@@ -6,6 +6,8 @@ package hostvolumemanager
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"sync"
@@ -101,6 +103,7 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
volState := &cstructs.HostVolumeState{
ID: req.ID,
HostPath: pluginResp.Path,
CreateReq: req,
}
if err := hvm.stateMgr.PutDynamicHostVolume(volState); err != nil {
@@ -120,10 +123,10 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
}
// free up the volume name whether delete succeeded or not.
hvm.locker.release(req.Name)
return nil, helper.FlattenMultierror(err)
return nil, err
}
hvm.updateNodeVols(req.Name, genVolConfig(req, pluginResp))
hvm.updateNodeVols(req.Name, genVolConfig(req, pluginResp.Path))
resp := &cstructs.ClientHostVolumeCreateResponse{
VolumeName: req.Name,
@@ -135,19 +138,59 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
return resp, nil
}
// Register saves the request to state, and updates the node with the volume.
func (hvm *HostVolumeManager) Register(ctx context.Context,
req *cstructs.ClientHostVolumeRegisterRequest) error {
// can't have two of the same volume name w/ different IDs per client node
if err := hvm.locker.lock(req.Name, req.ID); err != nil {
return err
}
_, err := os.Stat(req.HostPath)
if err != nil {
hvm.locker.release(req.Name)
return fmt.Errorf("could not verify host path for %q: %w", req.Name, err)
}
// generate a stub create request and plugin response for the fingerprint
// and client state
creq := &cstructs.ClientHostVolumeCreateRequest{
ID: req.ID,
Name: req.Name,
NodeID: req.NodeID,
Parameters: req.Parameters,
}
volState := &cstructs.HostVolumeState{
ID: req.ID,
CreateReq: creq,
HostPath: req.HostPath,
}
if err := hvm.stateMgr.PutDynamicHostVolume(volState); err != nil {
hvm.log.Error("failed to save volume in state", "volume_id", req.ID, "error", err)
hvm.locker.release(req.Name)
return err
}
hvm.updateNodeVols(req.Name, genVolConfig(creq, req.HostPath))
return nil
}
// Delete runs the appropriate plugin for the given request, removes it from
// state, and updates the node to remove the volume.
func (hvm *HostVolumeManager) Delete(ctx context.Context,
req *cstructs.ClientHostVolumeDeleteRequest) (*cstructs.ClientHostVolumeDeleteResponse, error) {
plug, err := hvm.getPlugin(req.PluginID)
if err != nil {
return nil, err
}
if req.PluginID != "" {
plug, err := hvm.getPlugin(req.PluginID)
if err != nil {
return nil, err
}
err = plug.Delete(ctx, req)
if err != nil {
return nil, err
err = plug.Delete(ctx, req)
if err != nil {
return nil, err
}
}
if err := hvm.stateMgr.DeleteDynamicHostVolume(req.ID); err != nil {
@@ -193,41 +236,22 @@ func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap,
return volumes, nil // nothing to do
}
// re-"create" the volumes - plugins have the best knowledge of their
// side effects, and they must be idempotent.
group := multierror.Group{}
for _, vol := range vols {
group.Go(func() error {
// missing plugins with associated volumes in state are considered
// client-stopping errors. they need to be fixed by cluster admins.
plug, err := hvm.getPlugin(vol.CreateReq.PluginID)
var volCfg *structs.ClientHostVolumeConfig
var err error
if vol.CreateReq.PluginID == "" {
volCfg, err = hvm.restoreForRegister(vol)
} else {
volCfg, err = hvm.restoreForCreate(ctx, vol)
}
if err != nil {
return err
}
// lock the name so future creates can't produce duplicates.
err = hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID)
// state should never have duplicate vol names, and restore happens
// prior to node registration, so new creates shouldn't come in
// concurrently, but check for error just in case.
if err != nil {
hvm.log.Error("error during restore",
"volume_name", vol.CreateReq.Name,
"volume_id", vol.CreateReq.ID,
"error", err)
// don't stop the world if it does happen, because an admin
// couldn't do anything about it short of wiping client state.
return nil
}
resp, err := plug.Create(ctx, vol.CreateReq)
if err != nil {
// plugin execution errors are only logged
hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err)
return nil
}
mut.Lock()
volumes[vol.CreateReq.Name] = genVolConfig(vol.CreateReq, resp)
volumes[vol.CreateReq.Name] = volCfg
mut.Unlock()
return nil
})
@@ -236,13 +260,73 @@ func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap,
return volumes, helper.FlattenMultierror(mErr.ErrorOrNil())
}
// restoreForCreate restores a single volume that was previously created by
// Create, by "recreating" the volumes. Plugins have the best knowledge of their
// side effects, and they must be idempotent.
func (hvm *HostVolumeManager) restoreForCreate(ctx context.Context, vol *cstructs.HostVolumeState) (*structs.ClientHostVolumeConfig, error) {
// missing plugins with associated volumes in state are considered
// client-stopping errors. they need to be fixed by cluster admins.
plug, err := hvm.getPlugin(vol.CreateReq.PluginID)
if err != nil {
return nil, err
}
// lock the name so future creates can't produce duplicates.
err = hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID)
// state should never have duplicate vol names, and restore happens
// prior to node registration, so new creates shouldn't come in
// concurrently, but check for error just in case.
if err != nil {
hvm.log.Error("error during restore",
"volume_name", vol.CreateReq.Name,
"volume_id", vol.CreateReq.ID,
"error", err)
// don't stop the world if it does happen, because an admin
// couldn't do anything about it short of wiping client state.
return nil, nil
}
resp, err := plug.Create(ctx, vol.CreateReq)
if err != nil {
// plugin execution errors are only logged
hvm.log.Error("failed to restore",
"plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err)
return nil, nil
}
return genVolConfig(vol.CreateReq, resp.Path), nil
}
// restoreForRegister restores a single volume that was previously created by
// Register, by converting the stored struct. It otherwise behaves the same as
// restoreForCreate.
func (hvm *HostVolumeManager) restoreForRegister(vol *cstructs.HostVolumeState) (*structs.ClientHostVolumeConfig, error) {
err := hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID)
if err != nil {
hvm.log.Error("error during restore",
"volume_name", vol.CreateReq.Name,
"volume_id", vol.CreateReq.ID,
"error", err)
return nil, nil
}
_, err = os.Stat(vol.HostPath)
if err != nil {
hvm.log.Error("failed to restore: could not verify host path",
"volume_id", vol.ID, "error", err, "path", vol.HostPath)
return nil, nil
}
return genVolConfig(vol.CreateReq, vol.HostPath), nil
}
// genVolConfig generates the host volume config for the node to report as
// available to the servers for job scheduling.
func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, resp *HostVolumePluginCreateResponse) *structs.ClientHostVolumeConfig {
func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, hostPath string) *structs.ClientHostVolumeConfig {
return &structs.ClientHostVolumeConfig{
Name: req.Name,
ID: req.ID,
Path: resp.Path,
Path: hostPath,
// dynamic volumes, like CSI, have more robust `capabilities`,
// so we always set ReadOnly to false, and let the scheduler

View File

@@ -24,29 +24,31 @@ import (
func TestHostVolumeManager(t *testing.T) {
log := testlog.HCLogger(t)
tmp := t.TempDir()
errDB := &cstate.ErrDB{}
memDB := cstate.NewMemDB(log)
node := newFakeNode(t)
hostPathCreate := t.TempDir()
hostPathRegister := t.TempDir()
hvm := NewHostVolumeManager(log, Config{
PluginDir: "./test_fixtures",
SharedMountDir: tmp,
SharedMountDir: hostPathCreate,
StateMgr: errDB,
UpdateNodeVols: node.updateVol,
})
plug := &fakePlugin{mountDir: tmp}
plug := &fakePlugin{mountDir: hostPathCreate}
hvm.builtIns["test-plugin"] = plug
ctx := timeout(t)
t.Run("create", func(t *testing.T) {
// plugin doesn't exist
name := "vol-name"
name := "created-volume"
req := &cstructs.ClientHostVolumeCreateRequest{
Name: name,
ID: "vol-id",
ID: "vol-id-1",
PluginID: "nope",
RequestedCapacityMinBytes: 5,
@@ -73,7 +75,7 @@ func TestHostVolumeManager(t *testing.T) {
// error saving state, successful cleanup
_, err = hvm.Create(ctx, req)
must.ErrorIs(t, err, cstate.ErrDBError)
must.Eq(t, "vol-id", plug.deleted)
must.Eq(t, "vol-id-1", plug.deleted)
assertNotLocked(t, hvm, name)
plug.reset()
@@ -82,9 +84,9 @@ func TestHostVolumeManager(t *testing.T) {
resp, err := hvm.Create(ctx, req)
must.NoError(t, err)
expectResp := &cstructs.ClientHostVolumeCreateResponse{
VolumeName: "vol-name",
VolumeID: "vol-id",
HostPath: tmp,
VolumeName: "created-volume",
VolumeID: "vol-id-1",
HostPath: hostPathCreate,
CapacityBytes: 5,
}
must.Eq(t, expectResp, resp)
@@ -92,8 +94,8 @@ func TestHostVolumeManager(t *testing.T) {
must.NoError(t, err)
// should be saved to state
must.Len(t, 1, stateDBs)
must.Eq(t, "vol-id", stateDBs[0].ID)
must.Eq(t, "vol-id", stateDBs[0].CreateReq.ID)
must.Eq(t, "vol-id-1", stateDBs[0].ID)
must.Eq(t, "vol-id-1", stateDBs[0].CreateReq.ID)
// should be registered with node
must.MapContainsKey(t, node.vols, name, must.Sprintf("no %q in %+v", name, node.vols))
assertLocked(t, hvm, name)
@@ -114,16 +116,37 @@ func TestHostVolumeManager(t *testing.T) {
must.ErrorIs(t, err, ErrVolumeNameExists)
})
// despite being a subtest, this needs to run after "create"
t.Run("register", func(t *testing.T) {
name := "registered-volume"
req := &cstructs.ClientHostVolumeRegisterRequest{
ID: "vol-id-2",
Name: name,
HostPath: hostPathRegister,
CapacityBytes: 1000,
}
err := hvm.Register(ctx, req)
must.NoError(t, err)
// should be saved to state and registered with node
stateDBs, err := memDB.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 2, stateDBs)
must.Eq(t, "vol-id-2", stateDBs[1].ID)
must.Eq(t, "vol-id-2", stateDBs[1].CreateReq.ID)
must.MapContainsKey(t, node.vols, name, must.Sprintf("no %q in %+v", name, node.vols))
assertLocked(t, hvm, name)
})
// despite being a subtest, this needs to run after "create" and "register"
t.Run("delete", func(t *testing.T) {
name := "vol-name"
name := "created-volume"
// should be locked from "create" above
assertLocked(t, hvm, name)
// plugin doesn't exist
req := &cstructs.ClientHostVolumeDeleteRequest{
Name: name,
ID: "vol-id",
ID: "vol-id-1",
PluginID: "nope",
}
_, err := hvm.Delete(ctx, req)
@@ -145,27 +168,40 @@ func TestHostVolumeManager(t *testing.T) {
assertLocked(t, hvm, name)
// happy path
// add stuff that should be deleted
hvm.stateMgr = memDB
must.NoError(t, memDB.PutDynamicHostVolume(&cstructs.HostVolumeState{
ID: "vol-id",
CreateReq: &cstructs.ClientHostVolumeCreateRequest{},
}))
node.vols = VolumeMap{
"vol-name": &structs.ClientHostVolumeConfig{},
}
// and delete it
resp, err := hvm.Delete(ctx, req)
must.NoError(t, err)
must.Eq(t, &cstructs.ClientHostVolumeDeleteResponse{
VolumeName: "vol-name",
VolumeID: "vol-id",
VolumeName: "created-volume",
VolumeID: "vol-id-1",
}, resp)
must.Eq(t, VolumeMap{}, node.vols, must.Sprint("vols should be deleted from node"))
must.Eq(t, VolumeMap{
"registered-volume": &structs.ClientHostVolumeConfig{
Name: "registered-volume",
Path: hostPathRegister,
ID: "vol-id-2",
},
}, node.vols, must.Sprint("created-volume should be deleted from node"))
stateVols, err := memDB.GetDynamicHostVolumes()
must.NoError(t, err)
must.Nil(t, stateVols, must.Sprint("vols should be deleted from state"))
must.Len(t, 1, stateVols, must.Sprint("only one volume should be deleted"))
assertNotLocked(t, hvm, name)
assertLocked(t, hvm, "registered-volume")
req.Name = "registered-volume"
req.ID = "vol-id-2"
req.PluginID = ""
resp, err = hvm.Delete(ctx, req)
must.NoError(t, err)
must.Eq(t, VolumeMap{}, node.vols, must.Sprint("all volumes should be deleted from node"))
stateVols, err = memDB.GetDynamicHostVolumes()
must.NoError(t, err)
must.Nil(t, stateVols, must.Sprint("all volumes should be deleted"))
assertNotLocked(t, hvm, "registered-volume")
})
}
@@ -223,14 +259,26 @@ func assertNotLocked(t *testing.T, hvm *HostVolumeManager, name string) {
func TestHostVolumeManager_restoreFromState(t *testing.T) {
log := testlog.HCLogger(t)
vol := &cstructs.HostVolumeState{
ID: "test-vol-id",
hostPath := t.TempDir()
vol1 := &cstructs.HostVolumeState{
ID: "test-vol-id-1",
CreateReq: &cstructs.ClientHostVolumeCreateRequest{
Name: "test-vol-name",
ID: "test-vol-id",
Name: "created-volume",
ID: "test-vol-id-1",
PluginID: "mkdir",
},
}
vol2 := &cstructs.HostVolumeState{
ID: "test-vol-id-2",
HostPath: hostPath,
CreateReq: &cstructs.ClientHostVolumeCreateRequest{
Name: "registered-volume",
ID: "test-vol-id-2",
PluginID: "", // this signifies a Register operation
},
}
node := newFakeNode(t)
t.Run("no vols", func(t *testing.T) {
@@ -247,12 +295,13 @@ func TestHostVolumeManager_restoreFromState(t *testing.T) {
t.Run("happy", func(t *testing.T) {
// put our volume in state
state := cstate.NewMemDB(log)
must.NoError(t, state.PutDynamicHostVolume(vol))
must.NoError(t, state.PutDynamicHostVolume(vol1))
must.NoError(t, state.PutDynamicHostVolume(vol2))
// new volume manager should load it from state and run Create,
// resulting in a volume directory in this mountDir.
mountDir := t.TempDir()
volPath := filepath.Join(mountDir, vol.ID)
volPath := filepath.Join(mountDir, vol1.ID)
hvm := NewHostVolumeManager(log, Config{
StateMgr: state,
@@ -265,18 +314,25 @@ func TestHostVolumeManager_restoreFromState(t *testing.T) {
must.NoError(t, err)
expect := map[string]*structs.ClientHostVolumeConfig{
"test-vol-name": {
Name: "test-vol-name",
ID: "test-vol-id",
"created-volume": {
Name: "created-volume",
ID: "test-vol-id-1",
Path: volPath,
ReadOnly: false,
},
"registered-volume": {
Name: "registered-volume",
ID: "test-vol-id-2",
Path: hostPath,
ReadOnly: false,
},
}
must.Eq(t, expect, vols)
must.DirExists(t, volPath)
assertLocked(t, hvm, "test-vol-name")
assertLocked(t, hvm, "created-volume")
assertLocked(t, hvm, "registered-volume")
})
t.Run("state error", func(t *testing.T) {

View File

@@ -5,6 +5,7 @@ package structs
type HostVolumeState struct {
ID string
HostPath string
CreateReq *ClientHostVolumeCreateRequest
}
@@ -40,8 +41,8 @@ type ClientHostVolumeCreateResponse struct {
VolumeName string
VolumeID string
// Path is the host path where the volume's mount point was created. We send
// this back to the server to make debugging easier.
// HostPath is the host path where the volume's mount point was created. We
// send this back to the server to make debugging easier.
HostPath string
// Capacity is the size in bytes that was actually provisioned by the host
@@ -49,6 +50,33 @@ type ClientHostVolumeCreateResponse struct {
CapacityBytes int64
}
type ClientHostVolumeRegisterRequest struct {
// ID is a UUID-like string generated by the server.
ID string
// Name is the name that group.volume will use to identify the volume
// source. Not expected to be unique cluster-wide, but must be unique per
// node.
Name string
// NodeID is the node where the volume is placed. It's included in the
// client RPC request so that the server can route the request to the
// correct node.
NodeID string
// HostPath is the host path where the volume's mount point was created
// out-of-band.
HostPath string
// Capacity is the size in bytes that was provisioned out-of-band.
CapacityBytes int64
// Parameters are an opaque map of parameters for the host volume plugin.
Parameters map[string]string
}
type ClientHostVolumeRegisterResponse struct{}
type ClientHostVolumeDeleteRequest struct {
// ID is a UUID-like string generated by the server.
ID string

View File

@@ -206,6 +206,7 @@ func decodeHostVolume(input *ast.File) (*api.HostVolume, error) {
// Need to manually parse these fields/blocks
delete(m, "capability")
delete(m, "constraint")
delete(m, "capacity")
delete(m, "capacity_max")
delete(m, "capacity_min")
delete(m, "type")
@@ -216,6 +217,11 @@ func decodeHostVolume(input *ast.File) (*api.HostVolume, error) {
return nil, err
}
capacity, err := parseCapacityBytes(list.Filter("capacity"))
if err != nil {
return nil, fmt.Errorf("invalid capacity: %v", err)
}
vol.CapacityBytes = capacity
capacityMin, err := parseCapacityBytes(list.Filter("capacity_min"))
if err != nil {
return nil, fmt.Errorf("invalid capacity_min: %v", err)

View File

@@ -30,6 +30,7 @@ func TestHostVolumeDeleteCommand(t *testing.T) {
must.NoError(t, err)
must.Len(t, 1, nodes)
nodeID := nodes[0].ID
hostPath := t.TempDir()
ui := cli.NewMockUi()
@@ -37,14 +38,15 @@ func TestHostVolumeDeleteCommand(t *testing.T) {
namespace = "prod"
name = "example"
type = "host"
plugin_id = "mkdir"
node_id = "%s"
node_pool = "default"
host_path = "%s"
capability {
access_mode = "single-node-reader-only"
attachment_mode = "file-system"
}
`, nodeID)
`, nodeID, hostPath)
file, err := os.CreateTemp(t.TempDir(), "volume-test-*.hcl")
must.NoError(t, err)

View File

@@ -30,6 +30,8 @@ func TestHostVolumeRegisterCommand_Run(t *testing.T) {
must.Len(t, 1, nodes)
nodeID := nodes[0].ID
hostPath := t.TempDir()
ui := cli.NewMockUi()
cmd := &VolumeRegisterCommand{Meta: Meta{Ui: ui}}
@@ -40,9 +42,9 @@ type = "host"
plugin_id = "plugin_id"
node_id = "%s"
node_pool = "default"
host_path = "%s"
capacity = 150000000
host_path = "/var/nomad/alloc_mounts/example"
capacity = "15GB"
capacity_min = "10GiB"
capacity_max = "20G"
@@ -69,7 +71,7 @@ capability {
parameters {
foo = "bar"
}
`, nodeID)
`, nodeID, hostPath)
file, err := os.CreateTemp(t.TempDir(), "volume-test-*.hcl")
must.NoError(t, err)

View File

@@ -109,6 +109,7 @@ func TestHostVolumeStatusCommand_Get(t *testing.T) {
must.NoError(t, err)
must.Len(t, 1, nodes)
nodeID := nodes[0].ID
hostPath := t.TempDir()
ui := cli.NewMockUi()
@@ -119,11 +120,12 @@ type = "host"
plugin_id = "mkdir"
node_id = "%s"
node_pool = "default"
host_path = "%s"
capability {
access_mode = "single-node-reader-only"
attachment_mode = "file-system"
}
`, nodeID)
`, nodeID, hostPath)
file, err := os.CreateTemp(t.TempDir(), "volume-test-*.hcl")
must.NoError(t, err)

View File

@@ -0,0 +1,15 @@
# Copyright (c) HashiCorp, Inc.
# SPDX-License-Identifier: BUSL-1.1
name = "no-plugin"
type = "host"
# this volume spec can be used with 'volume register' after filling in the
# host_path and node_id values below
host_path = "TODO" # absolute path of the volume that was created out-of-band
node_id = "TODO" # ID of the node where the volume was created
capability {
access_mode = "single-node-writer"
attachment_mode = "file-system"
}

View File

@@ -36,6 +36,18 @@ func (c *ClientHostVolume) Create(args *cstructs.ClientHostVolumeCreateRequest,
)
}
func (c *ClientHostVolume) Register(args *cstructs.ClientHostVolumeRegisterRequest, reply *cstructs.ClientHostVolumeRegisterResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_host_node", "register"}, time.Now())
return c.sendVolumeRPC(
args.NodeID,
"HostVolume.Register",
"ClientHostVolume.Register",
structs.RateMetricWrite,
args,
reply,
)
}
func (c *ClientHostVolume) Delete(args *cstructs.ClientHostVolumeDeleteRequest, reply *cstructs.ClientHostVolumeDeleteResponse) error {
defer metrics.MeasureSince([]string{"nomad", "client_host_volume", "delete"}, time.Now())
return c.sendVolumeRPC(

View File

@@ -230,17 +230,25 @@ func (v *HostVolume) Create(args *structs.HostVolumeCreateRequest, reply *struct
// ensure we only try to create a valid volume or make valid updates to a
// volume
now := time.Now()
snap, err := v.srv.State().Snapshot()
if err != nil {
return err
}
vol, err = v.validateVolumeUpdate(vol, snap, now)
existing, err := v.validateVolumeUpdate(vol, snap)
if err != nil {
return err
}
// set zero values as needed, possibly from existing
now := time.Now()
vol.CanonicalizeForCreate(existing, now)
// make sure any nodes or pools actually exist
err = v.validateVolumeForState(vol, snap)
if err != nil {
return fmt.Errorf("validating volume %q against state failed: %v", vol.Name, err)
}
_, err = v.placeHostVolume(snap, vol)
if err != nil {
return fmt.Errorf("could not place volume %q: %w", vol.Name, err)
@@ -320,13 +328,25 @@ func (v *HostVolume) Register(args *structs.HostVolumeRegisterRequest, reply *st
if vol.NodeID == "" {
return errors.New("cannot register volume: node ID is required")
}
if vol.HostPath == "" {
return errors.New("cannot register volume: host path is required")
}
now := time.Now()
vol, err = v.validateVolumeUpdate(vol, snap, now)
existing, err := v.validateVolumeUpdate(vol, snap)
if err != nil {
return err
}
// set zero values as needed, possibly from existing
now := time.Now()
vol.CanonicalizeForRegister(existing, now)
// make sure any nodes or pools actually exist
err = v.validateVolumeForState(vol, snap)
if err != nil {
return fmt.Errorf("validating volume %q against state failed: %v", vol.ID, err)
}
warn, err := v.enforceEnterprisePolicy(
snap, vol, args.GetIdentity().GetACLToken(), args.PolicyOverride)
if warn != nil {
@@ -336,6 +356,15 @@ func (v *HostVolume) Register(args *structs.HostVolumeRegisterRequest, reply *st
return err
}
// Attempt to register the volume on the client.
//
// NOTE: registering the volume on the client via the plugin can't be made
// atomic with the registration.
err = v.registerVolume(vol)
if err != nil {
return err
}
// Write a newly created or modified volume to raft. We create a new request
// here because we've likely mutated the volume.
_, index, err := v.srv.raftApply(structs.HostVolumeRegisterRequestType,
@@ -354,9 +383,7 @@ func (v *HostVolume) Register(args *structs.HostVolumeRegisterRequest, reply *st
}
func (v *HostVolume) validateVolumeUpdate(
vol *structs.HostVolume,
snap *state.StateSnapshot,
now time.Time) (*structs.HostVolume, error) {
vol *structs.HostVolume, snap *state.StateSnapshot) (*structs.HostVolume, error) {
// validate the volume spec
err := vol.Validate()
@@ -366,7 +393,6 @@ func (v *HostVolume) validateVolumeUpdate(
// validate any update we're making
var existing *structs.HostVolume
volID := vol.ID
if vol.ID != "" {
existing, err = snap.HostVolumeByID(nil, vol.Namespace, vol.ID, true)
if err != nil {
@@ -374,27 +400,13 @@ func (v *HostVolume) validateVolumeUpdate(
}
if existing == nil {
return nil, fmt.Errorf("cannot update volume %q: volume does not exist", vol.ID)
}
err = vol.ValidateUpdate(existing)
if err != nil {
return nil, fmt.Errorf("validating volume %q update failed: %v", vol.ID, err)
return existing, fmt.Errorf("validating volume %q update failed: %v", vol.ID, err)
}
} else {
// capture this for nicer error messages later
volID = vol.Name
}
// set zero values as needed, possibly from existing
vol.CanonicalizeForUpdate(existing, now)
// make sure any nodes or pools actually exist
err = v.validateVolumeForState(vol, snap)
if err != nil {
return nil, fmt.Errorf("validating volume %q against state failed: %v", volID, err)
}
return vol, nil
return existing, nil
}
// validateVolumeForState ensures that any references to node IDs or node pools are valid
@@ -455,6 +467,30 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error {
return nil
}
func (v *HostVolume) registerVolume(vol *structs.HostVolume) error {
method := "ClientHostVolume.Register"
cReq := &cstructs.ClientHostVolumeRegisterRequest{
ID: vol.ID,
Name: vol.Name,
NodeID: vol.NodeID,
HostPath: vol.HostPath,
CapacityBytes: vol.CapacityBytes,
Parameters: vol.Parameters,
}
cResp := &cstructs.ClientHostVolumeRegisterResponse{}
err := v.srv.RPC(method, cReq, cResp)
if err != nil {
return err
}
if vol.State == structs.HostVolumeStateUnknown {
vol.State = structs.HostVolumeStatePending
}
return nil
}
// placeHostVolume adds a node to volumes that don't already have one. The node
// will match the node pool and constraints, which doesn't already have a volume
// by that name. It returns the node (for testing) and an error indicating

View File

@@ -20,7 +20,6 @@ import (
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/hashicorp/nomad/version"
@@ -423,31 +422,25 @@ func TestHostVolumeEndpoint_List(t *testing.T) {
nspace2.Name = ns2
must.NoError(t, store.UpsertNamespaces(index, []*structs.Namespace{nspace1, nspace2}))
nodes := []*structs.Node{
mock.Node(),
mock.Node(),
mock.Node(),
}
nodes[2].NodePool = "prod"
index++
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup,
index, nodes[0], state.NodeUpsertWithNodePool))
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup,
index, nodes[1], state.NodeUpsertWithNodePool))
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup,
index, nodes[2], state.NodeUpsertWithNodePool))
_, node0 := newMockHostVolumeClient(t, srv, "default")
_, node1 := newMockHostVolumeClient(t, srv, "default")
_, node2 := newMockHostVolumeClient(t, srv, "prod")
vol1 := mock.HostVolumeRequestForNode(ns1, nodes[0])
vol1 := mock.HostVolumeRequestForNode(ns1, node0)
vol1.Name = "foobar-example"
vol1.HostPath = "/tmp/vol1"
vol2 := mock.HostVolumeRequestForNode(ns1, nodes[1])
vol2 := mock.HostVolumeRequestForNode(ns1, node1)
vol2.Name = "foobaz-example"
vol2.HostPath = "/tmp/vol2"
vol3 := mock.HostVolumeRequestForNode(ns2, nodes[2])
vol3 := mock.HostVolumeRequestForNode(ns2, node2)
vol3.Name = "foobar-example"
vol3.HostPath = "/tmp/vol3"
vol4 := mock.HostVolumeRequestForNode(ns2, nodes[1])
vol4 := mock.HostVolumeRequestForNode(ns2, node1)
vol4.Name = "foobaz-example"
vol4.HostPath = "/tmp/vol4"
// we need to register these rather than upsert them so we have the correct
// indexes for unblocking later.
@@ -538,7 +531,7 @@ func TestHostVolumeEndpoint_List(t *testing.T) {
{
name: "query by node",
req: &structs.HostVolumeListRequest{
NodeID: nodes[1].ID,
NodeID: node1.ID,
QueryOptions: structs.QueryOptions{
Region: srv.Region(),
Namespace: structs.AllNamespacesSentinel,
@@ -562,7 +555,7 @@ func TestHostVolumeEndpoint_List(t *testing.T) {
{
name: "query by incompatible node ID and pool",
req: &structs.HostVolumeListRequest{
NodeID: nodes[1].ID,
NodeID: node1.ID,
NodePool: "prod",
QueryOptions: structs.QueryOptions{
Region: srv.Region(),
@@ -776,6 +769,7 @@ type mockHostVolumeClient struct {
lock sync.Mutex
nextCreateResponse *cstructs.ClientHostVolumeCreateResponse
nextCreateErr error
nextRegisterErr error
nextDeleteErr error
}
@@ -837,6 +831,15 @@ func (v *mockHostVolumeClient) Create(
return v.nextCreateErr
}
func (v *mockHostVolumeClient) Register(
req *cstructs.ClientHostVolumeRegisterRequest,
resp *cstructs.ClientHostVolumeRegisterResponse) error {
v.lock.Lock()
defer v.lock.Unlock()
*resp = cstructs.ClientHostVolumeRegisterResponse{}
return v.nextRegisterErr
}
func (v *mockHostVolumeClient) Delete(
req *cstructs.ClientHostVolumeDeleteRequest,
resp *cstructs.ClientHostVolumeDeleteResponse) error {

View File

@@ -207,11 +207,11 @@ func (hv *HostVolume) ValidateUpdate(existing *HostVolume) error {
const DefaultHostVolumePlugin = "default"
// CanonicalizeForUpdate is called in the RPC handler to ensure we call client
// CanonicalizeForCreate is called in the RPC handler to ensure we call client
// RPCs with correctly populated fields from the existing volume, even if the
// RPC request includes otherwise valid zero-values. This method should be
// called on request objects or a copy, never on a state store object directly.
func (hv *HostVolume) CanonicalizeForUpdate(existing *HostVolume, now time.Time) {
func (hv *HostVolume) CanonicalizeForCreate(existing *HostVolume, now time.Time) {
if existing == nil {
hv.ID = uuid.Generate()
if hv.PluginID == "" {
@@ -235,6 +235,27 @@ func (hv *HostVolume) CanonicalizeForUpdate(existing *HostVolume, now time.Time)
hv.Allocations = nil // set on read only
}
// CanonicalizeForRegister is called in the RPC handler to ensure we call client
// RPCs with correctly populated fields from the existing volume, even if the
// RPC request includes otherwise valid zero-values. This method should be
// called on request objects or a copy, never on a state store object directly.
func (hv *HostVolume) CanonicalizeForRegister(existing *HostVolume, now time.Time) {
if existing == nil {
hv.ID = uuid.Generate()
hv.CreateTime = now.UnixNano()
} else {
hv.PluginID = existing.PluginID
hv.NodePool = existing.NodePool
hv.NodeID = existing.NodeID
hv.Constraints = existing.Constraints
hv.CreateTime = existing.CreateTime
}
hv.State = HostVolumeStatePending // reset on any change
hv.ModifyTime = now.UnixNano()
hv.Allocations = nil // set on read only
}
// GetNamespace implements the paginator.NamespaceGetter interface
func (hv *HostVolume) GetNamespace() string {
return hv.Namespace

View File

@@ -169,7 +169,7 @@ func TestHostVolume_ValidateUpdate(t *testing.T) {
}
func TestHostVolume_CanonicalizeForUpdate(t *testing.T) {
func TestHostVolume_Canonicalize(t *testing.T) {
now := time.Now()
vol := &HostVolume{
CapacityBytes: 100000,
@@ -179,7 +179,7 @@ func TestHostVolume_CanonicalizeForUpdate(t *testing.T) {
{ID: "7032e570"},
},
}
vol.CanonicalizeForUpdate(nil, now)
vol.CanonicalizeForCreate(nil, now)
must.NotEq(t, "", vol.ID)
must.Eq(t, now.UnixNano(), vol.CreateTime)
@@ -224,7 +224,7 @@ func TestHostVolume_CanonicalizeForUpdate(t *testing.T) {
CreateTime: 1,
}
vol.CanonicalizeForUpdate(existing, now)
vol.CanonicalizeForCreate(existing, now)
must.Eq(t, existing.ID, vol.ID)
must.Eq(t, existing.PluginID, vol.PluginID)
must.Eq(t, existing.NodePool, vol.NodePool)