From 9f850016bdf8bf296b6cf59f4c15d412abcef3c8 Mon Sep 17 00:00:00 2001 From: Lang Martin Date: Mon, 3 Feb 2020 11:59:00 -0500 Subject: [PATCH] csi: fix index maintenance for CSIVolume and CSIPlugin tables (#7049) * state_store: csi volumes/plugins store the index in the txn * nomad: csi_endpoint_test require index checks need uint64() * nomad: other tests using int 0 not uint64(0) * structs: pass index into New, but not other struct methods * state_store: csi plugin indexes, use new struct interface * nomad: csi_endpoint_test check index/query meta (on explicit 0) * structs: NewCSIVolume takes an index arg now * scheduler/test: NewCSIVolume takes an index arg now --- nomad/csi_endpoint_test.go | 18 +++++++------ nomad/node_endpoint_test.go | 4 +-- nomad/periodic_endpoint_test.go | 4 +-- nomad/state/state_store.go | 48 ++++++++++++++++++++++++++++----- nomad/state/state_store_test.go | 6 +++-- nomad/structs/csi.go | 20 ++++++-------- nomad/structs/csi_test.go | 2 +- scheduler/feasible_test.go | 2 +- scheduler/stack_test.go | 3 ++- 9 files changed, 71 insertions(+), 36 deletions(-) diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index bb1cac005..c4d313e96 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -40,7 +40,7 @@ func TestCSIVolumeEndpoint_Get(t *testing.T) { AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, PluginID: "minnie", }} - err := state.CSIVolumeRegister(0, vols) + err := state.CSIVolumeRegister(999, vols) require.NoError(t, err) // Create the register request @@ -56,7 +56,7 @@ func TestCSIVolumeEndpoint_Get(t *testing.T) { var resp structs.CSIVolumeGetResponse err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req, &resp) require.NoError(t, err) - require.NotEqual(t, 0, resp.Index) + require.Equal(t, uint64(999), resp.Index) require.Equal(t, vols[0].ID, resp.Volume.ID) } @@ -104,7 +104,7 @@ func TestCSIVolumeEndpoint_Register(t *testing.T) { resp1 := &structs.CSIVolumeRegisterResponse{} err := msgpackrpc.CallWithCodec(codec, "CSIVolume.Register", req1, resp1) require.NoError(t, err) - require.NotEqual(t, 0, resp1.Index) + require.NotEqual(t, uint64(0), resp1.Index) // Get the volume back out policy = mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIAccess}) @@ -120,7 +120,7 @@ func TestCSIVolumeEndpoint_Register(t *testing.T) { resp2 := &structs.CSIVolumeGetResponse{} err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req2, resp2) require.NoError(t, err) - require.NotEqual(t, 0, resp2.Index) + require.Equal(t, resp1.Index, resp2.Index) require.Equal(t, vols[0].ID, resp2.Volume.ID) // Registration does not update @@ -190,7 +190,7 @@ func TestCSIVolumeEndpoint_List(t *testing.T) { AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, PluginID: "paddy", }} - err := state.CSIVolumeRegister(0, vols) + err := state.CSIVolumeRegister(999, vols) require.NoError(t, err) var resp structs.CSIVolumeListResponse @@ -204,7 +204,7 @@ func TestCSIVolumeEndpoint_List(t *testing.T) { } err = msgpackrpc.CallWithCodec(codec, "CSIVolume.List", req, &resp) require.NoError(t, err) - require.NotEqual(t, 0, resp.Index) + require.Equal(t, uint64(999), resp.Index) require.Equal(t, 2, len(resp.Volumes)) ids := map[string]bool{vols[0].ID: true, vols[1].ID: true} for _, v := range resp.Volumes { @@ -280,7 +280,7 @@ func TestCSIPluginEndpoint_RegisterViaJob(t *testing.T) { resp1 := &structs.JobRegisterResponse{} err := msgpackrpc.CallWithCodec(codec, "Job.Register", req1, resp1) require.NoError(t, err) - require.NotEqual(t, 0, resp1.Index) + require.NotEqual(t, uint64(0), resp1.Index) // Get the plugin back out policy = mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIAccess}) @@ -296,7 +296,8 @@ func TestCSIPluginEndpoint_RegisterViaJob(t *testing.T) { resp2 := &structs.CSIPluginGetResponse{} err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2) require.NoError(t, err) - require.NotEqual(t, 0, resp2.Index) + // The job is created with a higher index than the plugin, there's an extra raft write + require.Greater(t, resp1.Index, resp2.Index) // List plugins req3 := &structs.CSIPluginListRequest{ @@ -323,6 +324,7 @@ func TestCSIPluginEndpoint_RegisterViaJob(t *testing.T) { resp4 := &structs.JobDeregisterResponse{} err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", req4, resp4) require.NoError(t, err) + require.Less(t, resp2.Index, resp4.Index) // Plugin is missing err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2) diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index 9d7ebe5ec..3ff670887 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2141,7 +2141,7 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) { start := time.Now() err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2) require.Nil(err) - require.NotEqual(0, resp2.Index) + require.NotEqual(uint64(0), resp2.Index) if diff := time.Since(start); diff < batchUpdateInterval { t.Fatalf("too fast: %v", diff) @@ -3235,7 +3235,7 @@ func TestClientEndpoint_EmitEvents(t *testing.T) { var resp structs.GenericResponse err = msgpackrpc.CallWithCodec(codec, "Node.EmitEvents", &req, &resp) require.Nil(err) - require.NotEqual(0, resp.Index) + require.NotEqual(uint64(0), resp.Index) // Check for the node in the FSM ws := memdb.NewWatchSet() diff --git a/nomad/periodic_endpoint_test.go b/nomad/periodic_endpoint_test.go index 0f77b645f..45b70b880 100644 --- a/nomad/periodic_endpoint_test.go +++ b/nomad/periodic_endpoint_test.go @@ -116,7 +116,7 @@ func TestPeriodicEndpoint_Force_ACL(t *testing.T) { req.AuthToken = token.SecretID var resp structs.PeriodicForceResponse assert.Nil(msgpackrpc.CallWithCodec(codec, "Periodic.Force", req, &resp)) - assert.NotEqual(0, resp.Index) + assert.NotEqual(uint64(0), resp.Index) // Lookup the evaluation ws := memdb.NewWatchSet() @@ -132,7 +132,7 @@ func TestPeriodicEndpoint_Force_ACL(t *testing.T) { req.AuthToken = root.SecretID var resp structs.PeriodicForceResponse assert.Nil(msgpackrpc.CallWithCodec(codec, "Periodic.Force", req, &resp)) - assert.NotEqual(0, resp.Index) + assert.NotEqual(uint64(0), resp.Index) // Lookup the evaluation ws := memdb.NewWatchSet() diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 4d578bfc3..96cc3ae6a 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -954,13 +954,14 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro var plug *structs.CSIPlugin if raw != nil { - plug = raw.(*structs.CSIPlugin).Copy(index) + plug = raw.(*structs.CSIPlugin).Copy() } else { plug = structs.NewCSIPlugin(info.PluginID, index) plug.ControllerRequired = info.RequiresControllerPlugin } - plug.AddPlugin(node.ID, info, index) + plug.AddPlugin(node.ID, info) + plug.ModifyIndex = index err = txn.Insert("csi_plugins", plug) if err != nil { @@ -984,6 +985,10 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro } } + if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + return nil } @@ -1010,8 +1015,9 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro return fmt.Errorf("csi_plugins missing plugin %s", id) } - plug := raw.(*structs.CSIPlugin).Copy(index) - plug.DeleteNode(node.ID, index) + plug := raw.(*structs.CSIPlugin).Copy() + plug.DeleteNode(node.ID) + plug.ModifyIndex = index if plug.IsEmpty() { err := txn.Delete("csi_plugins", plug) @@ -1026,6 +1032,10 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro } } + if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + return nil } @@ -1644,6 +1654,10 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum } } + if err := txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + txn.Commit() return nil } @@ -1710,16 +1724,22 @@ func (s *StateStore) CSIVolumeClaim(index uint64, id string, alloc *structs.Allo return fmt.Errorf("volume row conversion error") } - volume := orig.Copy(index) + volume := orig.Copy() if !volume.Claim(claim, alloc) { return fmt.Errorf("volume max claim reached") } + volume.ModifyIndex = index + if err = txn.Insert("csi_volumes", volume); err != nil { return fmt.Errorf("volume update failed: %s: %v", id, err) } + if err = txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + txn.Commit() return nil } @@ -1744,6 +1764,10 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, ids []string) error { } } + if err := txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + txn.Commit() return nil } @@ -1759,7 +1783,7 @@ func (s *StateStore) upsertJobCSIPlugins(index uint64, job *structs.Job, txn *me // Append this job to all of them for _, plug := range plugs { if plug.CreateIndex != index { - plug = plug.Copy(index) + plug = plug.Copy() } plug.AddJob(job) @@ -1770,6 +1794,10 @@ func (s *StateStore) upsertJobCSIPlugins(index uint64, job *structs.Job, txn *me } } + if err = txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + return nil } @@ -1819,10 +1847,12 @@ func (s *StateStore) deleteJobCSIPlugins(index uint64, job *structs.Job, txn *me // Remove this job from each plugin. If the plugin has no jobs left, remove it for _, plug := range plugs { if plug.CreateIndex != index { - plug = plug.Copy(index) + plug = plug.Copy() } plug.DeleteJob(job) + plug.ModifyIndex = index + if plug.IsEmpty() { err = txn.Delete("csi_plugins", plug) } else { @@ -1834,6 +1864,10 @@ func (s *StateStore) deleteJobCSIPlugins(index uint64, job *structs.Job, txn *me } } + if err = txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil { + return fmt.Errorf("index update failed: %v", err) + } + return nil } diff --git a/nomad/state/state_store_test.go b/nomad/state/state_store_test.go index df24da193..c965edf45 100644 --- a/nomad/state/state_store_test.go +++ b/nomad/state/state_store_test.go @@ -2831,8 +2831,9 @@ func TestStateStore_CSIVolume(t *testing.T) { state := testStateStore(t) id0, id1 := uuid.Generate(), uuid.Generate() + index := uint64(1000) - v0 := structs.NewCSIVolume("foo") + v0 := structs.NewCSIVolume("foo", index) v0.ID = id0 v0.Namespace = "default" v0.PluginID = "minnie" @@ -2840,7 +2841,8 @@ func TestStateStore_CSIVolume(t *testing.T) { v0.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter v0.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem - v1 := structs.NewCSIVolume("foo") + index++ + v1 := structs.NewCSIVolume("foo", index) v1.ID = id1 v1.Namespace = "default" v1.PluginID = "adam" diff --git a/nomad/structs/csi.go b/nomad/structs/csi.go index f0dcc0251..8dca6c9f8 100644 --- a/nomad/structs/csi.go +++ b/nomad/structs/csi.go @@ -185,9 +185,11 @@ type CSIVolListStub struct { } // NewCSIVolume creates the volume struct. No side-effects -func NewCSIVolume(pluginID string) *CSIVolume { +func NewCSIVolume(pluginID string, index uint64) *CSIVolume { out := &CSIVolume{ - ID: pluginID, + ID: pluginID, + CreateIndex: index, + ModifyIndex: index, } out.newStructs() @@ -250,11 +252,10 @@ func (v *CSIVolume) CanWrite() bool { } // Copy returns a copy of the volume, which shares only the Topologies slice -func (v *CSIVolume) Copy(index uint64) *CSIVolume { +func (v *CSIVolume) Copy() *CSIVolume { copy := *v out := © out.newStructs() - out.ModifyIndex = index for k, v := range v.ReadAllocs { out.ReadAllocs[k] = v @@ -486,11 +487,10 @@ func (p *CSIPlugin) newStructs() { p.Nodes = map[string]*CSIInfo{} } -func (p *CSIPlugin) Copy(index uint64) *CSIPlugin { +func (p *CSIPlugin) Copy() *CSIPlugin { copy := *p out := © out.newStructs() - out.ModifyIndex = index for ns, js := range p.Jobs { out.Jobs[ns] = map[string]*Job{} @@ -525,7 +525,7 @@ func (p *CSIPlugin) DeleteJob(job *Job) { // AddPlugin adds a single plugin running on the node. Called from state.NodeUpdate in a // transaction -func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo, index uint64) { +func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) { if info.ControllerInfo != nil { prev, ok := p.Controllers[nodeID] if ok && prev.Healthy { @@ -547,13 +547,11 @@ func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo, index uint64) { p.NodesHealthy += 1 } } - - p.ModifyIndex = index } // DeleteNode removes all plugins from the node. Called from state.DeleteNode in a // transaction -func (p *CSIPlugin) DeleteNode(nodeID string, index uint64) { +func (p *CSIPlugin) DeleteNode(nodeID string) { prev, ok := p.Controllers[nodeID] if ok && prev.Healthy { p.ControllersHealthy -= 1 @@ -565,8 +563,6 @@ func (p *CSIPlugin) DeleteNode(nodeID string, index uint64) { p.NodesHealthy -= 1 } delete(p.Nodes, nodeID) - - p.ModifyIndex = index } type CSIPluginListStub struct { diff --git a/nomad/structs/csi_test.go b/nomad/structs/csi_test.go index 7119b7b72..7685b41b4 100644 --- a/nomad/structs/csi_test.go +++ b/nomad/structs/csi_test.go @@ -7,7 +7,7 @@ import ( ) func TestCSIVolumeClaim(t *testing.T) { - vol := NewCSIVolume("") + vol := NewCSIVolume("", 0) vol.AccessMode = CSIVolumeAccessModeMultiNodeSingleWriter vol.Healthy = true diff --git a/scheduler/feasible_test.go b/scheduler/feasible_test.go index f738f2dc7..f5c176526 100644 --- a/scheduler/feasible_test.go +++ b/scheduler/feasible_test.go @@ -281,7 +281,7 @@ func TestCSIVolumeChecker(t *testing.T) { // Create the volume in the state store vid := "volume-id" - vol := structs.NewCSIVolume(vid) + vol := structs.NewCSIVolume(vid, index) vol.PluginID = "foo" vol.Namespace = structs.DefaultNamespace vol.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter diff --git a/scheduler/stack_test.go b/scheduler/stack_test.go index 5b611036e..db12c5820 100644 --- a/scheduler/stack_test.go +++ b/scheduler/stack_test.go @@ -239,7 +239,8 @@ func TestServiceStack_Select_CSI(t *testing.T) { } // Create a volume in the state store - v := structs.NewCSIVolume("foo") + index := uint64(999) + v := structs.NewCSIVolume("foo", index) v.Namespace = structs.DefaultNamespace v.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter v.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem