mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 18:05:42 +03:00
dynamic host volumes: update volume from node fingerprint (#24521)
When dynamic host volumes are created, they're written to the state store in a "pending" state. Once the client fingerprints the volume it's eligible for scheduling, so we mark the state as ready at that point. Because the fingerprint could potentially be returned before the RPC handler has a chance to write to the state store, this changeset adds test coverage to verify that upserts of pending volumes check the node for a previously-fingerprinted volume as well. Ref: https://github.com/hashicorp/nomad/pull/24479
This commit is contained in:
@@ -1048,6 +1048,9 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error {
|
||||
if err := upsertCSIPluginsForNode(txn, node, index); err != nil {
|
||||
return fmt.Errorf("csi plugin update failed: %v", err)
|
||||
}
|
||||
if err := upsertHostVolumeForNode(txn, node, index); err != nil {
|
||||
return fmt.Errorf("dynamic host volumes update failed: %v", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -75,7 +75,7 @@ func (s *StateStore) UpsertHostVolumes(index uint64, volumes []*structs.HostVolu
|
||||
}
|
||||
|
||||
// If the fingerprint is written from the node before the create RPC
|
||||
// handler completes, we'll never update from the initial pending , so
|
||||
// handler completes, we'll never update from the initial pending, so
|
||||
// reconcile that here
|
||||
node, err := s.NodeByID(nil, v.NodeID)
|
||||
if err != nil {
|
||||
@@ -190,3 +190,35 @@ func (s *StateStore) hostVolumesIter(ws memdb.WatchSet, index string, sort SortO
|
||||
ws.Add(iter.WatchCh())
|
||||
return iter, nil
|
||||
}
|
||||
|
||||
// upsertHostVolumeForNode sets newly fingerprinted host volumes to ready state
|
||||
func upsertHostVolumeForNode(txn *txn, node *structs.Node, index uint64) error {
|
||||
if len(node.HostVolumes) == 0 {
|
||||
return nil
|
||||
}
|
||||
iter, err := txn.Get(TableHostVolumes, indexNodeID, node.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
for {
|
||||
raw := iter.Next()
|
||||
if raw == nil {
|
||||
return nil
|
||||
}
|
||||
vol := raw.(*structs.HostVolume)
|
||||
switch vol.State {
|
||||
case structs.HostVolumeStateUnknown, structs.HostVolumeStatePending:
|
||||
if _, ok := node.HostVolumes[vol.Name]; ok {
|
||||
vol = vol.Copy()
|
||||
vol.State = structs.HostVolumeStateReady
|
||||
vol.ModifyIndex = index
|
||||
err = txn.Insert(TableHostVolumes, vol)
|
||||
if err != nil {
|
||||
return fmt.Errorf("host volume insert: %w", err)
|
||||
}
|
||||
}
|
||||
default:
|
||||
// don't touch ready or soft-deleted volumes
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/nomad/ci"
|
||||
"github.com/hashicorp/nomad/helper/uuid"
|
||||
"github.com/hashicorp/nomad/nomad/mock"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
"github.com/shoenig/test/must"
|
||||
@@ -163,3 +164,91 @@ func TestStateStore_HostVolumes_CRUD(t *testing.T) {
|
||||
got = consumeIter(iter)
|
||||
must.MapLen(t, 3, got, must.Sprint(`expected 3 volumes remain`))
|
||||
}
|
||||
|
||||
func TestStateStore_UpdateHostVolumesFromFingerprint(t *testing.T) {
|
||||
ci.Parallel(t)
|
||||
store := testStateStore(t)
|
||||
index, err := store.LatestIndex()
|
||||
must.NoError(t, err)
|
||||
|
||||
node := mock.Node()
|
||||
node.HostVolumes = map[string]*structs.ClientHostVolumeConfig{
|
||||
"static-vol": {Name: "static-vol", Path: "/srv/static"},
|
||||
"dhv-zero": {Name: "dhv-zero", Path: "/var/nomad/alloc_mounts" + uuid.Generate()},
|
||||
}
|
||||
index++
|
||||
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup,
|
||||
index, node, NodeUpsertWithNodePool))
|
||||
otherNode := mock.Node()
|
||||
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup,
|
||||
index, otherNode, NodeUpsertWithNodePool))
|
||||
|
||||
ns := structs.DefaultNamespace
|
||||
|
||||
vols := []*structs.HostVolume{
|
||||
mock.HostVolume(),
|
||||
mock.HostVolume(),
|
||||
mock.HostVolume(),
|
||||
mock.HostVolume(),
|
||||
}
|
||||
|
||||
// a volume that's been fingerprinted before we can write it to state
|
||||
vols[0].Name = "dhv-zero"
|
||||
vols[0].NodeID = node.ID
|
||||
|
||||
// a volume that will match the new fingerprint
|
||||
vols[1].Name = "dhv-one"
|
||||
vols[1].NodeID = node.ID
|
||||
|
||||
// a volume that matches the new fingerprint but on the wrong node
|
||||
vols[2].Name = "dhv-one"
|
||||
vols[2].NodeID = otherNode.ID
|
||||
|
||||
// a volume that won't be fingerprinted
|
||||
vols[3].Name = "dhv-two"
|
||||
vols[3].NodeID = node.ID
|
||||
|
||||
index++
|
||||
oldIndex := index
|
||||
must.NoError(t, store.UpsertHostVolumes(index, vols))
|
||||
|
||||
vol0, err := store.HostVolumeByID(nil, ns, vols[0].ID, false)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, structs.HostVolumeStateReady, vol0.State,
|
||||
must.Sprint("previously-fingerprinted volume should be in ready state"))
|
||||
|
||||
// update the fingerprint
|
||||
|
||||
node = node.Copy()
|
||||
node.HostVolumes["dhv-one"] = &structs.ClientHostVolumeConfig{
|
||||
Name: "dhv-one",
|
||||
Path: "/var/nomad/alloc_mounts" + uuid.Generate(),
|
||||
}
|
||||
|
||||
index++
|
||||
must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, node))
|
||||
|
||||
vol0, err = store.HostVolumeByID(nil, ns, vols[0].ID, false)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, oldIndex, vol0.ModifyIndex, must.Sprint("expected no further update"))
|
||||
must.Eq(t, structs.HostVolumeStateReady, vol0.State)
|
||||
|
||||
vol1, err := store.HostVolumeByID(nil, ns, vols[1].ID, false)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, index, vol1.ModifyIndex,
|
||||
must.Sprint("fingerprint should update pending volume"))
|
||||
must.Eq(t, structs.HostVolumeStateReady, vol1.State)
|
||||
|
||||
vol2, err := store.HostVolumeByID(nil, ns, vols[2].ID, false)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, oldIndex, vol2.ModifyIndex,
|
||||
must.Sprint("volume on other node should not change"))
|
||||
must.Eq(t, structs.HostVolumeStatePending, vol2.State)
|
||||
|
||||
vol3, err := store.HostVolumeByID(nil, ns, vols[3].ID, false)
|
||||
must.NoError(t, err)
|
||||
must.Eq(t, oldIndex, vol3.ModifyIndex,
|
||||
must.Sprint("volume not fingerprinted should not change"))
|
||||
must.Eq(t, structs.HostVolumeStatePending, vol3.State)
|
||||
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user