From 0a08ddb08352f7ffac2852c0a94fda680ff90781 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 21 Nov 2024 11:45:59 -0500 Subject: [PATCH] dynamic host volumes: update volume from node fingerprint (#24521) When dynamic host volumes are created, they're written to the state store in a "pending" state. Once the client fingerprints the volume it's eligible for scheduling, so we mark the state as ready at that point. Because the fingerprint could potentially be returned before the RPC handler has a chance to write to the state store, this changeset adds test coverage to verify that upserts of pending volumes check the node for a previously-fingerprinted volume as well. Ref: https://github.com/hashicorp/nomad/pull/24479 --- nomad/state/state_store.go | 3 + nomad/state/state_store_host_volumes.go | 34 +++++++- nomad/state/state_store_host_volumes_test.go | 89 ++++++++++++++++++++ 3 files changed, 125 insertions(+), 1 deletion(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 545c3f320..65ce87813 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -1048,6 +1048,9 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error { if err := upsertCSIPluginsForNode(txn, node, index); err != nil { return fmt.Errorf("csi plugin update failed: %v", err) } + if err := upsertHostVolumeForNode(txn, node, index); err != nil { + return fmt.Errorf("dynamic host volumes update failed: %v", err) + } return nil } diff --git a/nomad/state/state_store_host_volumes.go b/nomad/state/state_store_host_volumes.go index 27013b05d..dd5a68040 100644 --- a/nomad/state/state_store_host_volumes.go +++ b/nomad/state/state_store_host_volumes.go @@ -75,7 +75,7 @@ func (s *StateStore) UpsertHostVolumes(index uint64, volumes []*structs.HostVolu } // If the fingerprint is written from the node before the create RPC - // handler completes, we'll never update from the initial pending , so + // handler completes, we'll never update from the initial pending, so // reconcile that here node, err := s.NodeByID(nil, v.NodeID) if err != nil { @@ -190,3 +190,35 @@ func (s *StateStore) hostVolumesIter(ws memdb.WatchSet, index string, sort SortO ws.Add(iter.WatchCh()) return iter, nil } + +// upsertHostVolumeForNode sets newly fingerprinted host volumes to ready state +func upsertHostVolumeForNode(txn *txn, node *structs.Node, index uint64) error { + if len(node.HostVolumes) == 0 { + return nil + } + iter, err := txn.Get(TableHostVolumes, indexNodeID, node.ID) + if err != nil { + return err + } + for { + raw := iter.Next() + if raw == nil { + return nil + } + vol := raw.(*structs.HostVolume) + switch vol.State { + case structs.HostVolumeStateUnknown, structs.HostVolumeStatePending: + if _, ok := node.HostVolumes[vol.Name]; ok { + vol = vol.Copy() + vol.State = structs.HostVolumeStateReady + vol.ModifyIndex = index + err = txn.Insert(TableHostVolumes, vol) + if err != nil { + return fmt.Errorf("host volume insert: %w", err) + } + } + default: + // don't touch ready or soft-deleted volumes + } + } +} diff --git a/nomad/state/state_store_host_volumes_test.go b/nomad/state/state_store_host_volumes_test.go index 11b837115..af4c77a72 100644 --- a/nomad/state/state_store_host_volumes_test.go +++ b/nomad/state/state_store_host_volumes_test.go @@ -9,6 +9,7 @@ import ( memdb "github.com/hashicorp/go-memdb" "github.com/hashicorp/nomad/ci" + "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" "github.com/shoenig/test/must" @@ -163,3 +164,91 @@ func TestStateStore_HostVolumes_CRUD(t *testing.T) { got = consumeIter(iter) must.MapLen(t, 3, got, must.Sprint(`expected 3 volumes remain`)) } + +func TestStateStore_UpdateHostVolumesFromFingerprint(t *testing.T) { + ci.Parallel(t) + store := testStateStore(t) + index, err := store.LatestIndex() + must.NoError(t, err) + + node := mock.Node() + node.HostVolumes = map[string]*structs.ClientHostVolumeConfig{ + "static-vol": {Name: "static-vol", Path: "/srv/static"}, + "dhv-zero": {Name: "dhv-zero", Path: "/var/nomad/alloc_mounts" + uuid.Generate()}, + } + index++ + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, + index, node, NodeUpsertWithNodePool)) + otherNode := mock.Node() + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, + index, otherNode, NodeUpsertWithNodePool)) + + ns := structs.DefaultNamespace + + vols := []*structs.HostVolume{ + mock.HostVolume(), + mock.HostVolume(), + mock.HostVolume(), + mock.HostVolume(), + } + + // a volume that's been fingerprinted before we can write it to state + vols[0].Name = "dhv-zero" + vols[0].NodeID = node.ID + + // a volume that will match the new fingerprint + vols[1].Name = "dhv-one" + vols[1].NodeID = node.ID + + // a volume that matches the new fingerprint but on the wrong node + vols[2].Name = "dhv-one" + vols[2].NodeID = otherNode.ID + + // a volume that won't be fingerprinted + vols[3].Name = "dhv-two" + vols[3].NodeID = node.ID + + index++ + oldIndex := index + must.NoError(t, store.UpsertHostVolumes(index, vols)) + + vol0, err := store.HostVolumeByID(nil, ns, vols[0].ID, false) + must.NoError(t, err) + must.Eq(t, structs.HostVolumeStateReady, vol0.State, + must.Sprint("previously-fingerprinted volume should be in ready state")) + + // update the fingerprint + + node = node.Copy() + node.HostVolumes["dhv-one"] = &structs.ClientHostVolumeConfig{ + Name: "dhv-one", + Path: "/var/nomad/alloc_mounts" + uuid.Generate(), + } + + index++ + must.NoError(t, store.UpsertNode(structs.MsgTypeTestSetup, index, node)) + + vol0, err = store.HostVolumeByID(nil, ns, vols[0].ID, false) + must.NoError(t, err) + must.Eq(t, oldIndex, vol0.ModifyIndex, must.Sprint("expected no further update")) + must.Eq(t, structs.HostVolumeStateReady, vol0.State) + + vol1, err := store.HostVolumeByID(nil, ns, vols[1].ID, false) + must.NoError(t, err) + must.Eq(t, index, vol1.ModifyIndex, + must.Sprint("fingerprint should update pending volume")) + must.Eq(t, structs.HostVolumeStateReady, vol1.State) + + vol2, err := store.HostVolumeByID(nil, ns, vols[2].ID, false) + must.NoError(t, err) + must.Eq(t, oldIndex, vol2.ModifyIndex, + must.Sprint("volume on other node should not change")) + must.Eq(t, structs.HostVolumeStatePending, vol2.State) + + vol3, err := store.HostVolumeByID(nil, ns, vols[3].ID, false) + must.NoError(t, err) + must.Eq(t, oldIndex, vol3.ModifyIndex, + must.Sprint("volume not fingerprinted should not change")) + must.Eq(t, structs.HostVolumeStatePending, vol3.State) + +}