From 4a65b21aab901bfaa0a2195ec0877fdaec8a3c66 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Wed, 8 Jan 2025 16:58:58 -0500 Subject: [PATCH] dynamic host volumes: send register to client for fingerprint (#24802) When we register a volume without a plugin, we need to send a client RPC so that the node fingerprint can be updated. The registered volume also needs to be written to client state so that we can restore the fingerprint after a restart. Co-authored-by: Daniel Bennett --- api/host_volumes.go | 2 +- client/host_volume_endpoint.go | 18 ++ client/host_volume_endpoint_test.go | 98 +++++++++-- client/hostvolumemanager/host_volumes.go | 160 +++++++++++++----- client/hostvolumemanager/host_volumes_test.go | 128 ++++++++++---- client/structs/host_volumes.go | 32 +++- command/volume_create_host.go | 6 + command/volume_delete_host_test.go | 6 +- command/volume_register_host_test.go | 8 +- command/volume_status_host_test.go | 4 +- demo/hostvolume/no-plugin.volume.hcl | 15 ++ nomad/client_host_volume_endpoint.go | 12 ++ nomad/host_volume_endpoint.go | 86 +++++++--- nomad/host_volume_endpoint_test.go | 43 ++--- nomad/structs/host_volumes.go | 25 ++- nomad/structs/host_volumes_test.go | 6 +- 16 files changed, 498 insertions(+), 151 deletions(-) create mode 100644 demo/hostvolume/no-plugin.volume.hcl diff --git a/api/host_volumes.go b/api/host_volumes.go index 661ac77c1..5a7444f28 100644 --- a/api/host_volumes.go +++ b/api/host_volumes.go @@ -48,7 +48,7 @@ type HostVolume struct { // updated on the client RequestedCapacityMinBytes int64 `mapstructure:"capacity_min" hcl:"capacity_min"` RequestedCapacityMaxBytes int64 `mapstructure:"capacity_max" hcl:"capacity_max"` - CapacityBytes int64 + CapacityBytes int64 `mapstructure:"capacity" hcl:"capacity"` // RequestedCapabilities defines the options available to group.volume // blocks. The scheduler checks against the listed capability blocks and diff --git a/client/host_volume_endpoint.go b/client/host_volume_endpoint.go index 5c73ee4c3..491a548bf 100644 --- a/client/host_volume_endpoint.go +++ b/client/host_volume_endpoint.go @@ -43,6 +43,24 @@ func (v *HostVolume) Create( return nil } +func (v *HostVolume) Register( + req *cstructs.ClientHostVolumeRegisterRequest, + resp *cstructs.ClientHostVolumeRegisterResponse) error { + + defer metrics.MeasureSince([]string{"client", "host_volume", "register"}, time.Now()) + ctx, cancelFn := v.requestContext() + defer cancelFn() + + err := v.c.hostVolumeManager.Register(ctx, req) + if err != nil { + v.c.logger.Error("failed to register host volume", "name", req.Name, "error", err) + return err + } + + v.c.logger.Info("registered host volume", "id", req.ID, "path", req.HostPath) + return nil +} + func (v *HostVolume) Delete( req *cstructs.ClientHostVolumeDeleteRequest, resp *cstructs.ClientHostVolumeDeleteResponse) error { diff --git a/client/host_volume_endpoint_test.go b/client/host_volume_endpoint_test.go index a85eae0b1..71d8082a4 100644 --- a/client/host_volume_endpoint_test.go +++ b/client/host_volume_endpoint_test.go @@ -5,6 +5,7 @@ package client import ( "path/filepath" + "sort" "testing" "github.com/hashicorp/nomad/ci" @@ -12,7 +13,7 @@ import ( "github.com/hashicorp/nomad/client/state" cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/testlog" - "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/helper/uuid" "github.com/shoenig/test/must" ) @@ -33,26 +34,27 @@ func TestHostVolume(t *testing.T) { SharedMountDir: tmp, }) client.hostVolumeManager = manager - expectDir := filepath.Join(tmp, "test-vol-id") + hostPathCreate := filepath.Join(tmp, "test-vol-id-1") + hostPathRegister := t.TempDir() t.Run("happy", func(t *testing.T) { /* create */ req := &cstructs.ClientHostVolumeCreateRequest{ - Name: "test-vol-name", - ID: "test-vol-id", + Name: "created-volume", + ID: "test-vol-id-1", PluginID: "mkdir", // real plugin really makes a dir } var resp cstructs.ClientHostVolumeCreateResponse err := client.ClientRPC("HostVolume.Create", req, &resp) must.NoError(t, err) must.Eq(t, cstructs.ClientHostVolumeCreateResponse{ - HostPath: expectDir, + HostPath: hostPathCreate, CapacityBytes: 0, // "mkdir" always returns zero }, resp) // technically this is testing "mkdir" more than the RPC - must.DirExists(t, expectDir) + must.DirExists(t, hostPathCreate) // ensure we saved to client state vols, err := memdb.GetDynamicHostVolumes() must.NoError(t, err) @@ -60,6 +62,7 @@ func TestHostVolume(t *testing.T) { expectState := &cstructs.HostVolumeState{ ID: req.ID, CreateReq: req, + HostPath: hostPathCreate, } must.Eq(t, expectState, vols[0]) // and should be fingerprinted @@ -67,30 +70,89 @@ func TestHostVolume(t *testing.T) { req.Name: { ID: req.ID, Name: req.Name, - Path: expectDir, + Path: hostPathCreate, + }, + }, client.Node().HostVolumes) + + /* register */ + + regReq := &cstructs.ClientHostVolumeRegisterRequest{ + ID: "test-vol-id-2", + Name: "registered-volume", + NodeID: uuid.Generate(), + HostPath: hostPathRegister, + CapacityBytes: 1000, + } + var regResp cstructs.ClientHostVolumeRegisterResponse + err = client.ClientRPC("HostVolume.Register", regReq, ®Resp) + must.NoError(t, err) + + // ensure we saved to client state + vols, err = memdb.GetDynamicHostVolumes() + must.NoError(t, err) + must.Len(t, 2, vols) + sort.Slice(vols, func(i, j int) bool { return vols[i].ID < vols[j].ID }) + expectState = &cstructs.HostVolumeState{ + ID: regReq.ID, + HostPath: hostPathRegister, + CreateReq: &cstructs.ClientHostVolumeCreateRequest{ + ID: regReq.ID, + Name: regReq.Name, + NodeID: regReq.NodeID, + }, + } + must.Eq(t, expectState, vols[1]) + // and should be fingerprinted + must.Eq(t, hvm.VolumeMap{ + req.Name: { + ID: req.ID, + Name: req.Name, + Path: hostPathCreate, + }, + regReq.Name: { + ID: regReq.ID, + Name: regReq.Name, + Path: hostPathRegister, }, }, client.Node().HostVolumes) /* delete */ delReq := &cstructs.ClientHostVolumeDeleteRequest{ - Name: "test-vol-name", - ID: "test-vol-id", + Name: "created-volume", + ID: "test-vol-id-1", PluginID: "mkdir", - HostPath: expectDir, + HostPath: hostPathCreate, } var delResp cstructs.ClientHostVolumeDeleteResponse err = client.ClientRPC("HostVolume.Delete", delReq, &delResp) must.NoError(t, err) must.NotNil(t, delResp) // again, actually testing the "mkdir" plugin - must.DirNotExists(t, expectDir) + must.DirNotExists(t, hostPathCreate) // client state should be deleted vols, err = memdb.GetDynamicHostVolumes() must.NoError(t, err) - must.Len(t, 0, vols) + must.Len(t, 1, vols) // and the fingerprint, too - must.Eq(t, map[string]*structs.ClientHostVolumeConfig{}, client.Node().HostVolumes) + must.Eq(t, hvm.VolumeMap{ + regReq.Name: { + ID: regReq.ID, + Name: regReq.Name, + Path: hostPathRegister, + }, + }, client.Node().HostVolumes) + + delReq.Name = "registered-volume" + delReq.ID = "test-vol-id-2" + err = client.ClientRPC("HostVolume.Delete", delReq, &delResp) + must.NoError(t, err) + must.NotNil(t, delResp) + + vols, err = memdb.GetDynamicHostVolumes() + must.NoError(t, err) + must.Len(t, 0, vols) + must.Eq(t, hvm.VolumeMap{}, client.Node().HostVolumes) }) t.Run("missing plugin", func(t *testing.T) { @@ -119,20 +181,20 @@ func TestHostVolume(t *testing.T) { }) req := &cstructs.ClientHostVolumeCreateRequest{ - ID: "test-vol-id", - Name: "test-vol-name", + ID: "test-vol-id-1", + Name: "created-volume", PluginID: "mkdir", } var resp cstructs.ClientHostVolumeCreateResponse err := client.ClientRPC("HostVolume.Create", req, &resp) - must.ErrorContains(t, err, "host_volume_endpoint_test.go/test-vol-id: not a directory") + must.ErrorContains(t, err, "host_volume_endpoint_test.go/test-vol-id-1: not a directory") delReq := &cstructs.ClientHostVolumeDeleteRequest{ - ID: "test-vol-id", + ID: "test-vol-id-1", PluginID: "mkdir", } var delResp cstructs.ClientHostVolumeDeleteResponse err = client.ClientRPC("HostVolume.Delete", delReq, &delResp) - must.ErrorContains(t, err, "host_volume_endpoint_test.go/test-vol-id: not a directory") + must.ErrorContains(t, err, "host_volume_endpoint_test.go/test-vol-id-1: not a directory") }) } diff --git a/client/hostvolumemanager/host_volumes.go b/client/hostvolumemanager/host_volumes.go index 0516e5c1c..c2681ce8d 100644 --- a/client/hostvolumemanager/host_volumes.go +++ b/client/hostvolumemanager/host_volumes.go @@ -6,6 +6,8 @@ package hostvolumemanager import ( "context" "errors" + "fmt" + "os" "path/filepath" "sync" @@ -101,6 +103,7 @@ func (hvm *HostVolumeManager) Create(ctx context.Context, volState := &cstructs.HostVolumeState{ ID: req.ID, + HostPath: pluginResp.Path, CreateReq: req, } if err := hvm.stateMgr.PutDynamicHostVolume(volState); err != nil { @@ -120,10 +123,10 @@ func (hvm *HostVolumeManager) Create(ctx context.Context, } // free up the volume name whether delete succeeded or not. hvm.locker.release(req.Name) - return nil, helper.FlattenMultierror(err) + return nil, err } - hvm.updateNodeVols(req.Name, genVolConfig(req, pluginResp)) + hvm.updateNodeVols(req.Name, genVolConfig(req, pluginResp.Path)) resp := &cstructs.ClientHostVolumeCreateResponse{ VolumeName: req.Name, @@ -135,19 +138,59 @@ func (hvm *HostVolumeManager) Create(ctx context.Context, return resp, nil } +// Register saves the request to state, and updates the node with the volume. +func (hvm *HostVolumeManager) Register(ctx context.Context, + req *cstructs.ClientHostVolumeRegisterRequest) error { + + // can't have two of the same volume name w/ different IDs per client node + if err := hvm.locker.lock(req.Name, req.ID); err != nil { + return err + } + + _, err := os.Stat(req.HostPath) + if err != nil { + hvm.locker.release(req.Name) + return fmt.Errorf("could not verify host path for %q: %w", req.Name, err) + } + + // generate a stub create request and plugin response for the fingerprint + // and client state + creq := &cstructs.ClientHostVolumeCreateRequest{ + ID: req.ID, + Name: req.Name, + NodeID: req.NodeID, + Parameters: req.Parameters, + } + volState := &cstructs.HostVolumeState{ + ID: req.ID, + CreateReq: creq, + HostPath: req.HostPath, + } + if err := hvm.stateMgr.PutDynamicHostVolume(volState); err != nil { + hvm.log.Error("failed to save volume in state", "volume_id", req.ID, "error", err) + hvm.locker.release(req.Name) + return err + } + + hvm.updateNodeVols(req.Name, genVolConfig(creq, req.HostPath)) + return nil +} + // Delete runs the appropriate plugin for the given request, removes it from // state, and updates the node to remove the volume. func (hvm *HostVolumeManager) Delete(ctx context.Context, req *cstructs.ClientHostVolumeDeleteRequest) (*cstructs.ClientHostVolumeDeleteResponse, error) { - plug, err := hvm.getPlugin(req.PluginID) - if err != nil { - return nil, err - } + if req.PluginID != "" { + plug, err := hvm.getPlugin(req.PluginID) + if err != nil { + return nil, err + } - err = plug.Delete(ctx, req) - if err != nil { - return nil, err + err = plug.Delete(ctx, req) + if err != nil { + return nil, err + } } if err := hvm.stateMgr.DeleteDynamicHostVolume(req.ID); err != nil { @@ -193,41 +236,22 @@ func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap, return volumes, nil // nothing to do } - // re-"create" the volumes - plugins have the best knowledge of their - // side effects, and they must be idempotent. group := multierror.Group{} for _, vol := range vols { group.Go(func() error { - // missing plugins with associated volumes in state are considered - // client-stopping errors. they need to be fixed by cluster admins. - plug, err := hvm.getPlugin(vol.CreateReq.PluginID) + var volCfg *structs.ClientHostVolumeConfig + var err error + if vol.CreateReq.PluginID == "" { + volCfg, err = hvm.restoreForRegister(vol) + } else { + volCfg, err = hvm.restoreForCreate(ctx, vol) + } if err != nil { return err } - // lock the name so future creates can't produce duplicates. - err = hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID) - // state should never have duplicate vol names, and restore happens - // prior to node registration, so new creates shouldn't come in - // concurrently, but check for error just in case. - if err != nil { - hvm.log.Error("error during restore", - "volume_name", vol.CreateReq.Name, - "volume_id", vol.CreateReq.ID, - "error", err) - // don't stop the world if it does happen, because an admin - // couldn't do anything about it short of wiping client state. - return nil - } - - resp, err := plug.Create(ctx, vol.CreateReq) - if err != nil { - // plugin execution errors are only logged - hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err) - return nil - } mut.Lock() - volumes[vol.CreateReq.Name] = genVolConfig(vol.CreateReq, resp) + volumes[vol.CreateReq.Name] = volCfg mut.Unlock() return nil }) @@ -236,13 +260,73 @@ func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap, return volumes, helper.FlattenMultierror(mErr.ErrorOrNil()) } +// restoreForCreate restores a single volume that was previously created by +// Create, by "recreating" the volumes. Plugins have the best knowledge of their +// side effects, and they must be idempotent. +func (hvm *HostVolumeManager) restoreForCreate(ctx context.Context, vol *cstructs.HostVolumeState) (*structs.ClientHostVolumeConfig, error) { + // missing plugins with associated volumes in state are considered + // client-stopping errors. they need to be fixed by cluster admins. + plug, err := hvm.getPlugin(vol.CreateReq.PluginID) + if err != nil { + return nil, err + } + + // lock the name so future creates can't produce duplicates. + err = hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID) + // state should never have duplicate vol names, and restore happens + // prior to node registration, so new creates shouldn't come in + // concurrently, but check for error just in case. + if err != nil { + hvm.log.Error("error during restore", + "volume_name", vol.CreateReq.Name, + "volume_id", vol.CreateReq.ID, + "error", err) + // don't stop the world if it does happen, because an admin + // couldn't do anything about it short of wiping client state. + return nil, nil + } + + resp, err := plug.Create(ctx, vol.CreateReq) + if err != nil { + // plugin execution errors are only logged + hvm.log.Error("failed to restore", + "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err) + return nil, nil + } + + return genVolConfig(vol.CreateReq, resp.Path), nil +} + +// restoreForRegister restores a single volume that was previously created by +// Register, by converting the stored struct. It otherwise behaves the same as +// restoreForCreate. +func (hvm *HostVolumeManager) restoreForRegister(vol *cstructs.HostVolumeState) (*structs.ClientHostVolumeConfig, error) { + err := hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID) + if err != nil { + hvm.log.Error("error during restore", + "volume_name", vol.CreateReq.Name, + "volume_id", vol.CreateReq.ID, + "error", err) + return nil, nil + } + + _, err = os.Stat(vol.HostPath) + if err != nil { + hvm.log.Error("failed to restore: could not verify host path", + "volume_id", vol.ID, "error", err, "path", vol.HostPath) + return nil, nil + } + + return genVolConfig(vol.CreateReq, vol.HostPath), nil +} + // genVolConfig generates the host volume config for the node to report as // available to the servers for job scheduling. -func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, resp *HostVolumePluginCreateResponse) *structs.ClientHostVolumeConfig { +func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, hostPath string) *structs.ClientHostVolumeConfig { return &structs.ClientHostVolumeConfig{ Name: req.Name, ID: req.ID, - Path: resp.Path, + Path: hostPath, // dynamic volumes, like CSI, have more robust `capabilities`, // so we always set ReadOnly to false, and let the scheduler diff --git a/client/hostvolumemanager/host_volumes_test.go b/client/hostvolumemanager/host_volumes_test.go index fa4f57af5..43aa9ff81 100644 --- a/client/hostvolumemanager/host_volumes_test.go +++ b/client/hostvolumemanager/host_volumes_test.go @@ -24,29 +24,31 @@ import ( func TestHostVolumeManager(t *testing.T) { log := testlog.HCLogger(t) - tmp := t.TempDir() errDB := &cstate.ErrDB{} memDB := cstate.NewMemDB(log) node := newFakeNode(t) + hostPathCreate := t.TempDir() + hostPathRegister := t.TempDir() + hvm := NewHostVolumeManager(log, Config{ PluginDir: "./test_fixtures", - SharedMountDir: tmp, + SharedMountDir: hostPathCreate, StateMgr: errDB, UpdateNodeVols: node.updateVol, }) - plug := &fakePlugin{mountDir: tmp} + plug := &fakePlugin{mountDir: hostPathCreate} hvm.builtIns["test-plugin"] = plug ctx := timeout(t) t.Run("create", func(t *testing.T) { // plugin doesn't exist - name := "vol-name" + name := "created-volume" req := &cstructs.ClientHostVolumeCreateRequest{ Name: name, - ID: "vol-id", + ID: "vol-id-1", PluginID: "nope", RequestedCapacityMinBytes: 5, @@ -73,7 +75,7 @@ func TestHostVolumeManager(t *testing.T) { // error saving state, successful cleanup _, err = hvm.Create(ctx, req) must.ErrorIs(t, err, cstate.ErrDBError) - must.Eq(t, "vol-id", plug.deleted) + must.Eq(t, "vol-id-1", plug.deleted) assertNotLocked(t, hvm, name) plug.reset() @@ -82,9 +84,9 @@ func TestHostVolumeManager(t *testing.T) { resp, err := hvm.Create(ctx, req) must.NoError(t, err) expectResp := &cstructs.ClientHostVolumeCreateResponse{ - VolumeName: "vol-name", - VolumeID: "vol-id", - HostPath: tmp, + VolumeName: "created-volume", + VolumeID: "vol-id-1", + HostPath: hostPathCreate, CapacityBytes: 5, } must.Eq(t, expectResp, resp) @@ -92,8 +94,8 @@ func TestHostVolumeManager(t *testing.T) { must.NoError(t, err) // should be saved to state must.Len(t, 1, stateDBs) - must.Eq(t, "vol-id", stateDBs[0].ID) - must.Eq(t, "vol-id", stateDBs[0].CreateReq.ID) + must.Eq(t, "vol-id-1", stateDBs[0].ID) + must.Eq(t, "vol-id-1", stateDBs[0].CreateReq.ID) // should be registered with node must.MapContainsKey(t, node.vols, name, must.Sprintf("no %q in %+v", name, node.vols)) assertLocked(t, hvm, name) @@ -114,16 +116,37 @@ func TestHostVolumeManager(t *testing.T) { must.ErrorIs(t, err, ErrVolumeNameExists) }) - // despite being a subtest, this needs to run after "create" + t.Run("register", func(t *testing.T) { + name := "registered-volume" + req := &cstructs.ClientHostVolumeRegisterRequest{ + ID: "vol-id-2", + Name: name, + HostPath: hostPathRegister, + CapacityBytes: 1000, + } + err := hvm.Register(ctx, req) + must.NoError(t, err) + + // should be saved to state and registered with node + stateDBs, err := memDB.GetDynamicHostVolumes() + must.NoError(t, err) + must.Len(t, 2, stateDBs) + must.Eq(t, "vol-id-2", stateDBs[1].ID) + must.Eq(t, "vol-id-2", stateDBs[1].CreateReq.ID) + must.MapContainsKey(t, node.vols, name, must.Sprintf("no %q in %+v", name, node.vols)) + assertLocked(t, hvm, name) + }) + + // despite being a subtest, this needs to run after "create" and "register" t.Run("delete", func(t *testing.T) { - name := "vol-name" + name := "created-volume" // should be locked from "create" above assertLocked(t, hvm, name) // plugin doesn't exist req := &cstructs.ClientHostVolumeDeleteRequest{ Name: name, - ID: "vol-id", + ID: "vol-id-1", PluginID: "nope", } _, err := hvm.Delete(ctx, req) @@ -145,27 +168,40 @@ func TestHostVolumeManager(t *testing.T) { assertLocked(t, hvm, name) // happy path - // add stuff that should be deleted hvm.stateMgr = memDB - must.NoError(t, memDB.PutDynamicHostVolume(&cstructs.HostVolumeState{ - ID: "vol-id", - CreateReq: &cstructs.ClientHostVolumeCreateRequest{}, - })) - node.vols = VolumeMap{ - "vol-name": &structs.ClientHostVolumeConfig{}, - } + // and delete it resp, err := hvm.Delete(ctx, req) must.NoError(t, err) must.Eq(t, &cstructs.ClientHostVolumeDeleteResponse{ - VolumeName: "vol-name", - VolumeID: "vol-id", + VolumeName: "created-volume", + VolumeID: "vol-id-1", }, resp) - must.Eq(t, VolumeMap{}, node.vols, must.Sprint("vols should be deleted from node")) + must.Eq(t, VolumeMap{ + "registered-volume": &structs.ClientHostVolumeConfig{ + Name: "registered-volume", + Path: hostPathRegister, + ID: "vol-id-2", + }, + }, node.vols, must.Sprint("created-volume should be deleted from node")) stateVols, err := memDB.GetDynamicHostVolumes() must.NoError(t, err) - must.Nil(t, stateVols, must.Sprint("vols should be deleted from state")) + must.Len(t, 1, stateVols, must.Sprint("only one volume should be deleted")) + assertNotLocked(t, hvm, name) + assertLocked(t, hvm, "registered-volume") + + req.Name = "registered-volume" + req.ID = "vol-id-2" + req.PluginID = "" + resp, err = hvm.Delete(ctx, req) + must.NoError(t, err) + + must.Eq(t, VolumeMap{}, node.vols, must.Sprint("all volumes should be deleted from node")) + stateVols, err = memDB.GetDynamicHostVolumes() + must.NoError(t, err) + must.Nil(t, stateVols, must.Sprint("all volumes should be deleted")) + assertNotLocked(t, hvm, "registered-volume") }) } @@ -223,14 +259,26 @@ func assertNotLocked(t *testing.T, hvm *HostVolumeManager, name string) { func TestHostVolumeManager_restoreFromState(t *testing.T) { log := testlog.HCLogger(t) - vol := &cstructs.HostVolumeState{ - ID: "test-vol-id", + hostPath := t.TempDir() + + vol1 := &cstructs.HostVolumeState{ + ID: "test-vol-id-1", CreateReq: &cstructs.ClientHostVolumeCreateRequest{ - Name: "test-vol-name", - ID: "test-vol-id", + Name: "created-volume", + ID: "test-vol-id-1", PluginID: "mkdir", }, } + vol2 := &cstructs.HostVolumeState{ + ID: "test-vol-id-2", + HostPath: hostPath, + CreateReq: &cstructs.ClientHostVolumeCreateRequest{ + Name: "registered-volume", + ID: "test-vol-id-2", + PluginID: "", // this signifies a Register operation + }, + } + node := newFakeNode(t) t.Run("no vols", func(t *testing.T) { @@ -247,12 +295,13 @@ func TestHostVolumeManager_restoreFromState(t *testing.T) { t.Run("happy", func(t *testing.T) { // put our volume in state state := cstate.NewMemDB(log) - must.NoError(t, state.PutDynamicHostVolume(vol)) + must.NoError(t, state.PutDynamicHostVolume(vol1)) + must.NoError(t, state.PutDynamicHostVolume(vol2)) // new volume manager should load it from state and run Create, // resulting in a volume directory in this mountDir. mountDir := t.TempDir() - volPath := filepath.Join(mountDir, vol.ID) + volPath := filepath.Join(mountDir, vol1.ID) hvm := NewHostVolumeManager(log, Config{ StateMgr: state, @@ -265,18 +314,25 @@ func TestHostVolumeManager_restoreFromState(t *testing.T) { must.NoError(t, err) expect := map[string]*structs.ClientHostVolumeConfig{ - "test-vol-name": { - Name: "test-vol-name", - ID: "test-vol-id", + "created-volume": { + Name: "created-volume", + ID: "test-vol-id-1", Path: volPath, ReadOnly: false, }, + "registered-volume": { + Name: "registered-volume", + ID: "test-vol-id-2", + Path: hostPath, + ReadOnly: false, + }, } must.Eq(t, expect, vols) must.DirExists(t, volPath) - assertLocked(t, hvm, "test-vol-name") + assertLocked(t, hvm, "created-volume") + assertLocked(t, hvm, "registered-volume") }) t.Run("state error", func(t *testing.T) { diff --git a/client/structs/host_volumes.go b/client/structs/host_volumes.go index bff543588..2e5121dde 100644 --- a/client/structs/host_volumes.go +++ b/client/structs/host_volumes.go @@ -5,6 +5,7 @@ package structs type HostVolumeState struct { ID string + HostPath string CreateReq *ClientHostVolumeCreateRequest } @@ -40,8 +41,8 @@ type ClientHostVolumeCreateResponse struct { VolumeName string VolumeID string - // Path is the host path where the volume's mount point was created. We send - // this back to the server to make debugging easier. + // HostPath is the host path where the volume's mount point was created. We + // send this back to the server to make debugging easier. HostPath string // Capacity is the size in bytes that was actually provisioned by the host @@ -49,6 +50,33 @@ type ClientHostVolumeCreateResponse struct { CapacityBytes int64 } +type ClientHostVolumeRegisterRequest struct { + // ID is a UUID-like string generated by the server. + ID string + + // Name is the name that group.volume will use to identify the volume + // source. Not expected to be unique cluster-wide, but must be unique per + // node. + Name string + + // NodeID is the node where the volume is placed. It's included in the + // client RPC request so that the server can route the request to the + // correct node. + NodeID string + + // HostPath is the host path where the volume's mount point was created + // out-of-band. + HostPath string + + // Capacity is the size in bytes that was provisioned out-of-band. + CapacityBytes int64 + + // Parameters are an opaque map of parameters for the host volume plugin. + Parameters map[string]string +} + +type ClientHostVolumeRegisterResponse struct{} + type ClientHostVolumeDeleteRequest struct { // ID is a UUID-like string generated by the server. ID string diff --git a/command/volume_create_host.go b/command/volume_create_host.go index dc0d1e1ae..4aedc6e87 100644 --- a/command/volume_create_host.go +++ b/command/volume_create_host.go @@ -206,6 +206,7 @@ func decodeHostVolume(input *ast.File) (*api.HostVolume, error) { // Need to manually parse these fields/blocks delete(m, "capability") delete(m, "constraint") + delete(m, "capacity") delete(m, "capacity_max") delete(m, "capacity_min") delete(m, "type") @@ -216,6 +217,11 @@ func decodeHostVolume(input *ast.File) (*api.HostVolume, error) { return nil, err } + capacity, err := parseCapacityBytes(list.Filter("capacity")) + if err != nil { + return nil, fmt.Errorf("invalid capacity: %v", err) + } + vol.CapacityBytes = capacity capacityMin, err := parseCapacityBytes(list.Filter("capacity_min")) if err != nil { return nil, fmt.Errorf("invalid capacity_min: %v", err) diff --git a/command/volume_delete_host_test.go b/command/volume_delete_host_test.go index 5edd72bb7..7215ebb69 100644 --- a/command/volume_delete_host_test.go +++ b/command/volume_delete_host_test.go @@ -30,6 +30,7 @@ func TestHostVolumeDeleteCommand(t *testing.T) { must.NoError(t, err) must.Len(t, 1, nodes) nodeID := nodes[0].ID + hostPath := t.TempDir() ui := cli.NewMockUi() @@ -37,14 +38,15 @@ func TestHostVolumeDeleteCommand(t *testing.T) { namespace = "prod" name = "example" type = "host" -plugin_id = "mkdir" node_id = "%s" node_pool = "default" +host_path = "%s" + capability { access_mode = "single-node-reader-only" attachment_mode = "file-system" } -`, nodeID) +`, nodeID, hostPath) file, err := os.CreateTemp(t.TempDir(), "volume-test-*.hcl") must.NoError(t, err) diff --git a/command/volume_register_host_test.go b/command/volume_register_host_test.go index 9ed9d5c96..fef179b02 100644 --- a/command/volume_register_host_test.go +++ b/command/volume_register_host_test.go @@ -30,6 +30,8 @@ func TestHostVolumeRegisterCommand_Run(t *testing.T) { must.Len(t, 1, nodes) nodeID := nodes[0].ID + hostPath := t.TempDir() + ui := cli.NewMockUi() cmd := &VolumeRegisterCommand{Meta: Meta{Ui: ui}} @@ -40,9 +42,9 @@ type = "host" plugin_id = "plugin_id" node_id = "%s" node_pool = "default" +host_path = "%s" -capacity = 150000000 -host_path = "/var/nomad/alloc_mounts/example" +capacity = "15GB" capacity_min = "10GiB" capacity_max = "20G" @@ -69,7 +71,7 @@ capability { parameters { foo = "bar" } -`, nodeID) +`, nodeID, hostPath) file, err := os.CreateTemp(t.TempDir(), "volume-test-*.hcl") must.NoError(t, err) diff --git a/command/volume_status_host_test.go b/command/volume_status_host_test.go index 2da56e903..8df4c11ef 100644 --- a/command/volume_status_host_test.go +++ b/command/volume_status_host_test.go @@ -109,6 +109,7 @@ func TestHostVolumeStatusCommand_Get(t *testing.T) { must.NoError(t, err) must.Len(t, 1, nodes) nodeID := nodes[0].ID + hostPath := t.TempDir() ui := cli.NewMockUi() @@ -119,11 +120,12 @@ type = "host" plugin_id = "mkdir" node_id = "%s" node_pool = "default" +host_path = "%s" capability { access_mode = "single-node-reader-only" attachment_mode = "file-system" } -`, nodeID) +`, nodeID, hostPath) file, err := os.CreateTemp(t.TempDir(), "volume-test-*.hcl") must.NoError(t, err) diff --git a/demo/hostvolume/no-plugin.volume.hcl b/demo/hostvolume/no-plugin.volume.hcl new file mode 100644 index 000000000..ee215bfd0 --- /dev/null +++ b/demo/hostvolume/no-plugin.volume.hcl @@ -0,0 +1,15 @@ +# Copyright (c) HashiCorp, Inc. +# SPDX-License-Identifier: BUSL-1.1 + +name = "no-plugin" +type = "host" + +# this volume spec can be used with 'volume register' after filling in the +# host_path and node_id values below +host_path = "TODO" # absolute path of the volume that was created out-of-band +node_id = "TODO" # ID of the node where the volume was created + +capability { + access_mode = "single-node-writer" + attachment_mode = "file-system" +} diff --git a/nomad/client_host_volume_endpoint.go b/nomad/client_host_volume_endpoint.go index 5749643d2..c4afadc65 100644 --- a/nomad/client_host_volume_endpoint.go +++ b/nomad/client_host_volume_endpoint.go @@ -36,6 +36,18 @@ func (c *ClientHostVolume) Create(args *cstructs.ClientHostVolumeCreateRequest, ) } +func (c *ClientHostVolume) Register(args *cstructs.ClientHostVolumeRegisterRequest, reply *cstructs.ClientHostVolumeRegisterResponse) error { + defer metrics.MeasureSince([]string{"nomad", "client_host_node", "register"}, time.Now()) + return c.sendVolumeRPC( + args.NodeID, + "HostVolume.Register", + "ClientHostVolume.Register", + structs.RateMetricWrite, + args, + reply, + ) +} + func (c *ClientHostVolume) Delete(args *cstructs.ClientHostVolumeDeleteRequest, reply *cstructs.ClientHostVolumeDeleteResponse) error { defer metrics.MeasureSince([]string{"nomad", "client_host_volume", "delete"}, time.Now()) return c.sendVolumeRPC( diff --git a/nomad/host_volume_endpoint.go b/nomad/host_volume_endpoint.go index 0c8f2ac4e..fcdd8c887 100644 --- a/nomad/host_volume_endpoint.go +++ b/nomad/host_volume_endpoint.go @@ -230,17 +230,25 @@ func (v *HostVolume) Create(args *structs.HostVolumeCreateRequest, reply *struct // ensure we only try to create a valid volume or make valid updates to a // volume - now := time.Now() snap, err := v.srv.State().Snapshot() if err != nil { return err } - - vol, err = v.validateVolumeUpdate(vol, snap, now) + existing, err := v.validateVolumeUpdate(vol, snap) if err != nil { return err } + // set zero values as needed, possibly from existing + now := time.Now() + vol.CanonicalizeForCreate(existing, now) + + // make sure any nodes or pools actually exist + err = v.validateVolumeForState(vol, snap) + if err != nil { + return fmt.Errorf("validating volume %q against state failed: %v", vol.Name, err) + } + _, err = v.placeHostVolume(snap, vol) if err != nil { return fmt.Errorf("could not place volume %q: %w", vol.Name, err) @@ -320,13 +328,25 @@ func (v *HostVolume) Register(args *structs.HostVolumeRegisterRequest, reply *st if vol.NodeID == "" { return errors.New("cannot register volume: node ID is required") } + if vol.HostPath == "" { + return errors.New("cannot register volume: host path is required") + } - now := time.Now() - vol, err = v.validateVolumeUpdate(vol, snap, now) + existing, err := v.validateVolumeUpdate(vol, snap) if err != nil { return err } + // set zero values as needed, possibly from existing + now := time.Now() + vol.CanonicalizeForRegister(existing, now) + + // make sure any nodes or pools actually exist + err = v.validateVolumeForState(vol, snap) + if err != nil { + return fmt.Errorf("validating volume %q against state failed: %v", vol.ID, err) + } + warn, err := v.enforceEnterprisePolicy( snap, vol, args.GetIdentity().GetACLToken(), args.PolicyOverride) if warn != nil { @@ -336,6 +356,15 @@ func (v *HostVolume) Register(args *structs.HostVolumeRegisterRequest, reply *st return err } + // Attempt to register the volume on the client. + // + // NOTE: registering the volume on the client via the plugin can't be made + // atomic with the registration. + err = v.registerVolume(vol) + if err != nil { + return err + } + // Write a newly created or modified volume to raft. We create a new request // here because we've likely mutated the volume. _, index, err := v.srv.raftApply(structs.HostVolumeRegisterRequestType, @@ -354,9 +383,7 @@ func (v *HostVolume) Register(args *structs.HostVolumeRegisterRequest, reply *st } func (v *HostVolume) validateVolumeUpdate( - vol *structs.HostVolume, - snap *state.StateSnapshot, - now time.Time) (*structs.HostVolume, error) { + vol *structs.HostVolume, snap *state.StateSnapshot) (*structs.HostVolume, error) { // validate the volume spec err := vol.Validate() @@ -366,7 +393,6 @@ func (v *HostVolume) validateVolumeUpdate( // validate any update we're making var existing *structs.HostVolume - volID := vol.ID if vol.ID != "" { existing, err = snap.HostVolumeByID(nil, vol.Namespace, vol.ID, true) if err != nil { @@ -374,27 +400,13 @@ func (v *HostVolume) validateVolumeUpdate( } if existing == nil { return nil, fmt.Errorf("cannot update volume %q: volume does not exist", vol.ID) - } err = vol.ValidateUpdate(existing) if err != nil { - return nil, fmt.Errorf("validating volume %q update failed: %v", vol.ID, err) + return existing, fmt.Errorf("validating volume %q update failed: %v", vol.ID, err) } - } else { - // capture this for nicer error messages later - volID = vol.Name } - - // set zero values as needed, possibly from existing - vol.CanonicalizeForUpdate(existing, now) - - // make sure any nodes or pools actually exist - err = v.validateVolumeForState(vol, snap) - if err != nil { - return nil, fmt.Errorf("validating volume %q against state failed: %v", volID, err) - } - - return vol, nil + return existing, nil } // validateVolumeForState ensures that any references to node IDs or node pools are valid @@ -455,6 +467,30 @@ func (v *HostVolume) createVolume(vol *structs.HostVolume) error { return nil } +func (v *HostVolume) registerVolume(vol *structs.HostVolume) error { + + method := "ClientHostVolume.Register" + cReq := &cstructs.ClientHostVolumeRegisterRequest{ + ID: vol.ID, + Name: vol.Name, + NodeID: vol.NodeID, + HostPath: vol.HostPath, + CapacityBytes: vol.CapacityBytes, + Parameters: vol.Parameters, + } + cResp := &cstructs.ClientHostVolumeRegisterResponse{} + err := v.srv.RPC(method, cReq, cResp) + if err != nil { + return err + } + + if vol.State == structs.HostVolumeStateUnknown { + vol.State = structs.HostVolumeStatePending + } + + return nil +} + // placeHostVolume adds a node to volumes that don't already have one. The node // will match the node pool and constraints, which doesn't already have a volume // by that name. It returns the node (for testing) and an error indicating diff --git a/nomad/host_volume_endpoint_test.go b/nomad/host_volume_endpoint_test.go index 544390aec..ae5bf08c9 100644 --- a/nomad/host_volume_endpoint_test.go +++ b/nomad/host_volume_endpoint_test.go @@ -20,7 +20,6 @@ import ( "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/hashicorp/nomad/version" @@ -423,31 +422,25 @@ func TestHostVolumeEndpoint_List(t *testing.T) { nspace2.Name = ns2 must.NoError(t, store.UpsertNamespaces(index, []*structs.Namespace{nspace1, nspace2})) - nodes := []*structs.Node{ - mock.Node(), - mock.Node(), - mock.Node(), - } - nodes[2].NodePool = "prod" - index++ - must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, - index, nodes[0], state.NodeUpsertWithNodePool)) - must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, - index, nodes[1], state.NodeUpsertWithNodePool)) - must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, - index, nodes[2], state.NodeUpsertWithNodePool)) + _, node0 := newMockHostVolumeClient(t, srv, "default") + _, node1 := newMockHostVolumeClient(t, srv, "default") + _, node2 := newMockHostVolumeClient(t, srv, "prod") - vol1 := mock.HostVolumeRequestForNode(ns1, nodes[0]) + vol1 := mock.HostVolumeRequestForNode(ns1, node0) vol1.Name = "foobar-example" + vol1.HostPath = "/tmp/vol1" - vol2 := mock.HostVolumeRequestForNode(ns1, nodes[1]) + vol2 := mock.HostVolumeRequestForNode(ns1, node1) vol2.Name = "foobaz-example" + vol2.HostPath = "/tmp/vol2" - vol3 := mock.HostVolumeRequestForNode(ns2, nodes[2]) + vol3 := mock.HostVolumeRequestForNode(ns2, node2) vol3.Name = "foobar-example" + vol3.HostPath = "/tmp/vol3" - vol4 := mock.HostVolumeRequestForNode(ns2, nodes[1]) + vol4 := mock.HostVolumeRequestForNode(ns2, node1) vol4.Name = "foobaz-example" + vol4.HostPath = "/tmp/vol4" // we need to register these rather than upsert them so we have the correct // indexes for unblocking later. @@ -538,7 +531,7 @@ func TestHostVolumeEndpoint_List(t *testing.T) { { name: "query by node", req: &structs.HostVolumeListRequest{ - NodeID: nodes[1].ID, + NodeID: node1.ID, QueryOptions: structs.QueryOptions{ Region: srv.Region(), Namespace: structs.AllNamespacesSentinel, @@ -562,7 +555,7 @@ func TestHostVolumeEndpoint_List(t *testing.T) { { name: "query by incompatible node ID and pool", req: &structs.HostVolumeListRequest{ - NodeID: nodes[1].ID, + NodeID: node1.ID, NodePool: "prod", QueryOptions: structs.QueryOptions{ Region: srv.Region(), @@ -776,6 +769,7 @@ type mockHostVolumeClient struct { lock sync.Mutex nextCreateResponse *cstructs.ClientHostVolumeCreateResponse nextCreateErr error + nextRegisterErr error nextDeleteErr error } @@ -837,6 +831,15 @@ func (v *mockHostVolumeClient) Create( return v.nextCreateErr } +func (v *mockHostVolumeClient) Register( + req *cstructs.ClientHostVolumeRegisterRequest, + resp *cstructs.ClientHostVolumeRegisterResponse) error { + v.lock.Lock() + defer v.lock.Unlock() + *resp = cstructs.ClientHostVolumeRegisterResponse{} + return v.nextRegisterErr +} + func (v *mockHostVolumeClient) Delete( req *cstructs.ClientHostVolumeDeleteRequest, resp *cstructs.ClientHostVolumeDeleteResponse) error { diff --git a/nomad/structs/host_volumes.go b/nomad/structs/host_volumes.go index 440ad9565..d68c1c0b4 100644 --- a/nomad/structs/host_volumes.go +++ b/nomad/structs/host_volumes.go @@ -207,11 +207,11 @@ func (hv *HostVolume) ValidateUpdate(existing *HostVolume) error { const DefaultHostVolumePlugin = "default" -// CanonicalizeForUpdate is called in the RPC handler to ensure we call client +// CanonicalizeForCreate is called in the RPC handler to ensure we call client // RPCs with correctly populated fields from the existing volume, even if the // RPC request includes otherwise valid zero-values. This method should be // called on request objects or a copy, never on a state store object directly. -func (hv *HostVolume) CanonicalizeForUpdate(existing *HostVolume, now time.Time) { +func (hv *HostVolume) CanonicalizeForCreate(existing *HostVolume, now time.Time) { if existing == nil { hv.ID = uuid.Generate() if hv.PluginID == "" { @@ -235,6 +235,27 @@ func (hv *HostVolume) CanonicalizeForUpdate(existing *HostVolume, now time.Time) hv.Allocations = nil // set on read only } +// CanonicalizeForRegister is called in the RPC handler to ensure we call client +// RPCs with correctly populated fields from the existing volume, even if the +// RPC request includes otherwise valid zero-values. This method should be +// called on request objects or a copy, never on a state store object directly. +func (hv *HostVolume) CanonicalizeForRegister(existing *HostVolume, now time.Time) { + if existing == nil { + hv.ID = uuid.Generate() + hv.CreateTime = now.UnixNano() + } else { + hv.PluginID = existing.PluginID + hv.NodePool = existing.NodePool + hv.NodeID = existing.NodeID + hv.Constraints = existing.Constraints + hv.CreateTime = existing.CreateTime + } + + hv.State = HostVolumeStatePending // reset on any change + hv.ModifyTime = now.UnixNano() + hv.Allocations = nil // set on read only +} + // GetNamespace implements the paginator.NamespaceGetter interface func (hv *HostVolume) GetNamespace() string { return hv.Namespace diff --git a/nomad/structs/host_volumes_test.go b/nomad/structs/host_volumes_test.go index 2a03e838d..4830f0bb4 100644 --- a/nomad/structs/host_volumes_test.go +++ b/nomad/structs/host_volumes_test.go @@ -169,7 +169,7 @@ func TestHostVolume_ValidateUpdate(t *testing.T) { } -func TestHostVolume_CanonicalizeForUpdate(t *testing.T) { +func TestHostVolume_Canonicalize(t *testing.T) { now := time.Now() vol := &HostVolume{ CapacityBytes: 100000, @@ -179,7 +179,7 @@ func TestHostVolume_CanonicalizeForUpdate(t *testing.T) { {ID: "7032e570"}, }, } - vol.CanonicalizeForUpdate(nil, now) + vol.CanonicalizeForCreate(nil, now) must.NotEq(t, "", vol.ID) must.Eq(t, now.UnixNano(), vol.CreateTime) @@ -224,7 +224,7 @@ func TestHostVolume_CanonicalizeForUpdate(t *testing.T) { CreateTime: 1, } - vol.CanonicalizeForUpdate(existing, now) + vol.CanonicalizeForCreate(existing, now) must.Eq(t, existing.ID, vol.ID) must.Eq(t, existing.PluginID, vol.PluginID) must.Eq(t, existing.NodePool, vol.NodePool)