csi: fix index maintenance for CSIVolume and CSIPlugin tables (#7049)

* state_store: csi volumes/plugins store the index in the txn

* nomad: csi_endpoint_test require index checks need uint64()

* nomad: other tests using int 0 not uint64(0)

* structs: pass index into New, but not other struct methods

* state_store: csi plugin indexes, use new struct interface

* nomad: csi_endpoint_test check index/query meta (on explicit 0)

* structs: NewCSIVolume takes an index arg now

* scheduler/test: NewCSIVolume takes an index arg now
This commit is contained in:
Lang Martin
2020-02-03 11:59:00 -05:00
committed by Tim Gross
parent 5d9f560f59
commit 9f850016bd
9 changed files with 71 additions and 36 deletions

View File

@@ -40,7 +40,7 @@ func TestCSIVolumeEndpoint_Get(t *testing.T) {
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
PluginID: "minnie",
}}
err := state.CSIVolumeRegister(0, vols)
err := state.CSIVolumeRegister(999, vols)
require.NoError(t, err)
// Create the register request
@@ -56,7 +56,7 @@ func TestCSIVolumeEndpoint_Get(t *testing.T) {
var resp structs.CSIVolumeGetResponse
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req, &resp)
require.NoError(t, err)
require.NotEqual(t, 0, resp.Index)
require.Equal(t, uint64(999), resp.Index)
require.Equal(t, vols[0].ID, resp.Volume.ID)
}
@@ -104,7 +104,7 @@ func TestCSIVolumeEndpoint_Register(t *testing.T) {
resp1 := &structs.CSIVolumeRegisterResponse{}
err := msgpackrpc.CallWithCodec(codec, "CSIVolume.Register", req1, resp1)
require.NoError(t, err)
require.NotEqual(t, 0, resp1.Index)
require.NotEqual(t, uint64(0), resp1.Index)
// Get the volume back out
policy = mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIAccess})
@@ -120,7 +120,7 @@ func TestCSIVolumeEndpoint_Register(t *testing.T) {
resp2 := &structs.CSIVolumeGetResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Get", req2, resp2)
require.NoError(t, err)
require.NotEqual(t, 0, resp2.Index)
require.Equal(t, resp1.Index, resp2.Index)
require.Equal(t, vols[0].ID, resp2.Volume.ID)
// Registration does not update
@@ -190,7 +190,7 @@ func TestCSIVolumeEndpoint_List(t *testing.T) {
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
PluginID: "paddy",
}}
err := state.CSIVolumeRegister(0, vols)
err := state.CSIVolumeRegister(999, vols)
require.NoError(t, err)
var resp structs.CSIVolumeListResponse
@@ -204,7 +204,7 @@ func TestCSIVolumeEndpoint_List(t *testing.T) {
}
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.List", req, &resp)
require.NoError(t, err)
require.NotEqual(t, 0, resp.Index)
require.Equal(t, uint64(999), resp.Index)
require.Equal(t, 2, len(resp.Volumes))
ids := map[string]bool{vols[0].ID: true, vols[1].ID: true}
for _, v := range resp.Volumes {
@@ -280,7 +280,7 @@ func TestCSIPluginEndpoint_RegisterViaJob(t *testing.T) {
resp1 := &structs.JobRegisterResponse{}
err := msgpackrpc.CallWithCodec(codec, "Job.Register", req1, resp1)
require.NoError(t, err)
require.NotEqual(t, 0, resp1.Index)
require.NotEqual(t, uint64(0), resp1.Index)
// Get the plugin back out
policy = mock.NamespacePolicy(ns, "", []string{acl.NamespaceCapabilityCSIAccess})
@@ -296,7 +296,8 @@ func TestCSIPluginEndpoint_RegisterViaJob(t *testing.T) {
resp2 := &structs.CSIPluginGetResponse{}
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2)
require.NoError(t, err)
require.NotEqual(t, 0, resp2.Index)
// The job is created with a higher index than the plugin, there's an extra raft write
require.Greater(t, resp1.Index, resp2.Index)
// List plugins
req3 := &structs.CSIPluginListRequest{
@@ -323,6 +324,7 @@ func TestCSIPluginEndpoint_RegisterViaJob(t *testing.T) {
resp4 := &structs.JobDeregisterResponse{}
err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", req4, resp4)
require.NoError(t, err)
require.Less(t, resp2.Index, resp4.Index)
// Plugin is missing
err = msgpackrpc.CallWithCodec(codec, "CSIPlugin.Get", req2, resp2)

View File

@@ -2141,7 +2141,7 @@ func TestClientEndpoint_UpdateAlloc(t *testing.T) {
start := time.Now()
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2)
require.Nil(err)
require.NotEqual(0, resp2.Index)
require.NotEqual(uint64(0), resp2.Index)
if diff := time.Since(start); diff < batchUpdateInterval {
t.Fatalf("too fast: %v", diff)
@@ -3235,7 +3235,7 @@ func TestClientEndpoint_EmitEvents(t *testing.T) {
var resp structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "Node.EmitEvents", &req, &resp)
require.Nil(err)
require.NotEqual(0, resp.Index)
require.NotEqual(uint64(0), resp.Index)
// Check for the node in the FSM
ws := memdb.NewWatchSet()

View File

@@ -116,7 +116,7 @@ func TestPeriodicEndpoint_Force_ACL(t *testing.T) {
req.AuthToken = token.SecretID
var resp structs.PeriodicForceResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Periodic.Force", req, &resp))
assert.NotEqual(0, resp.Index)
assert.NotEqual(uint64(0), resp.Index)
// Lookup the evaluation
ws := memdb.NewWatchSet()
@@ -132,7 +132,7 @@ func TestPeriodicEndpoint_Force_ACL(t *testing.T) {
req.AuthToken = root.SecretID
var resp structs.PeriodicForceResponse
assert.Nil(msgpackrpc.CallWithCodec(codec, "Periodic.Force", req, &resp))
assert.NotEqual(0, resp.Index)
assert.NotEqual(uint64(0), resp.Index)
// Lookup the evaluation
ws := memdb.NewWatchSet()

View File

@@ -954,13 +954,14 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
var plug *structs.CSIPlugin
if raw != nil {
plug = raw.(*structs.CSIPlugin).Copy(index)
plug = raw.(*structs.CSIPlugin).Copy()
} else {
plug = structs.NewCSIPlugin(info.PluginID, index)
plug.ControllerRequired = info.RequiresControllerPlugin
}
plug.AddPlugin(node.ID, info, index)
plug.AddPlugin(node.ID, info)
plug.ModifyIndex = index
err = txn.Insert("csi_plugins", plug)
if err != nil {
@@ -984,6 +985,10 @@ func upsertNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
}
}
if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
return nil
}
@@ -1010,8 +1015,9 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
return fmt.Errorf("csi_plugins missing plugin %s", id)
}
plug := raw.(*structs.CSIPlugin).Copy(index)
plug.DeleteNode(node.ID, index)
plug := raw.(*structs.CSIPlugin).Copy()
plug.DeleteNode(node.ID)
plug.ModifyIndex = index
if plug.IsEmpty() {
err := txn.Delete("csi_plugins", plug)
@@ -1026,6 +1032,10 @@ func deleteNodeCSIPlugins(txn *memdb.Txn, node *structs.Node, index uint64) erro
}
}
if err := txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
return nil
}
@@ -1644,6 +1654,10 @@ func (s *StateStore) CSIVolumeRegister(index uint64, volumes []*structs.CSIVolum
}
}
if err := txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Commit()
return nil
}
@@ -1710,16 +1724,22 @@ func (s *StateStore) CSIVolumeClaim(index uint64, id string, alloc *structs.Allo
return fmt.Errorf("volume row conversion error")
}
volume := orig.Copy(index)
volume := orig.Copy()
if !volume.Claim(claim, alloc) {
return fmt.Errorf("volume max claim reached")
}
volume.ModifyIndex = index
if err = txn.Insert("csi_volumes", volume); err != nil {
return fmt.Errorf("volume update failed: %s: %v", id, err)
}
if err = txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Commit()
return nil
}
@@ -1744,6 +1764,10 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, ids []string) error {
}
}
if err := txn.Insert("index", &IndexEntry{"csi_volumes", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
txn.Commit()
return nil
}
@@ -1759,7 +1783,7 @@ func (s *StateStore) upsertJobCSIPlugins(index uint64, job *structs.Job, txn *me
// Append this job to all of them
for _, plug := range plugs {
if plug.CreateIndex != index {
plug = plug.Copy(index)
plug = plug.Copy()
}
plug.AddJob(job)
@@ -1770,6 +1794,10 @@ func (s *StateStore) upsertJobCSIPlugins(index uint64, job *structs.Job, txn *me
}
}
if err = txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
return nil
}
@@ -1819,10 +1847,12 @@ func (s *StateStore) deleteJobCSIPlugins(index uint64, job *structs.Job, txn *me
// Remove this job from each plugin. If the plugin has no jobs left, remove it
for _, plug := range plugs {
if plug.CreateIndex != index {
plug = plug.Copy(index)
plug = plug.Copy()
}
plug.DeleteJob(job)
plug.ModifyIndex = index
if plug.IsEmpty() {
err = txn.Delete("csi_plugins", plug)
} else {
@@ -1834,6 +1864,10 @@ func (s *StateStore) deleteJobCSIPlugins(index uint64, job *structs.Job, txn *me
}
}
if err = txn.Insert("index", &IndexEntry{"csi_plugins", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}
return nil
}

View File

@@ -2831,8 +2831,9 @@ func TestStateStore_CSIVolume(t *testing.T) {
state := testStateStore(t)
id0, id1 := uuid.Generate(), uuid.Generate()
index := uint64(1000)
v0 := structs.NewCSIVolume("foo")
v0 := structs.NewCSIVolume("foo", index)
v0.ID = id0
v0.Namespace = "default"
v0.PluginID = "minnie"
@@ -2840,7 +2841,8 @@ func TestStateStore_CSIVolume(t *testing.T) {
v0.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter
v0.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem
v1 := structs.NewCSIVolume("foo")
index++
v1 := structs.NewCSIVolume("foo", index)
v1.ID = id1
v1.Namespace = "default"
v1.PluginID = "adam"

View File

@@ -185,9 +185,11 @@ type CSIVolListStub struct {
}
// NewCSIVolume creates the volume struct. No side-effects
func NewCSIVolume(pluginID string) *CSIVolume {
func NewCSIVolume(pluginID string, index uint64) *CSIVolume {
out := &CSIVolume{
ID: pluginID,
ID: pluginID,
CreateIndex: index,
ModifyIndex: index,
}
out.newStructs()
@@ -250,11 +252,10 @@ func (v *CSIVolume) CanWrite() bool {
}
// Copy returns a copy of the volume, which shares only the Topologies slice
func (v *CSIVolume) Copy(index uint64) *CSIVolume {
func (v *CSIVolume) Copy() *CSIVolume {
copy := *v
out := &copy
out.newStructs()
out.ModifyIndex = index
for k, v := range v.ReadAllocs {
out.ReadAllocs[k] = v
@@ -486,11 +487,10 @@ func (p *CSIPlugin) newStructs() {
p.Nodes = map[string]*CSIInfo{}
}
func (p *CSIPlugin) Copy(index uint64) *CSIPlugin {
func (p *CSIPlugin) Copy() *CSIPlugin {
copy := *p
out := &copy
out.newStructs()
out.ModifyIndex = index
for ns, js := range p.Jobs {
out.Jobs[ns] = map[string]*Job{}
@@ -525,7 +525,7 @@ func (p *CSIPlugin) DeleteJob(job *Job) {
// AddPlugin adds a single plugin running on the node. Called from state.NodeUpdate in a
// transaction
func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo, index uint64) {
func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo) {
if info.ControllerInfo != nil {
prev, ok := p.Controllers[nodeID]
if ok && prev.Healthy {
@@ -547,13 +547,11 @@ func (p *CSIPlugin) AddPlugin(nodeID string, info *CSIInfo, index uint64) {
p.NodesHealthy += 1
}
}
p.ModifyIndex = index
}
// DeleteNode removes all plugins from the node. Called from state.DeleteNode in a
// transaction
func (p *CSIPlugin) DeleteNode(nodeID string, index uint64) {
func (p *CSIPlugin) DeleteNode(nodeID string) {
prev, ok := p.Controllers[nodeID]
if ok && prev.Healthy {
p.ControllersHealthy -= 1
@@ -565,8 +563,6 @@ func (p *CSIPlugin) DeleteNode(nodeID string, index uint64) {
p.NodesHealthy -= 1
}
delete(p.Nodes, nodeID)
p.ModifyIndex = index
}
type CSIPluginListStub struct {

View File

@@ -7,7 +7,7 @@ import (
)
func TestCSIVolumeClaim(t *testing.T) {
vol := NewCSIVolume("")
vol := NewCSIVolume("", 0)
vol.AccessMode = CSIVolumeAccessModeMultiNodeSingleWriter
vol.Healthy = true

View File

@@ -281,7 +281,7 @@ func TestCSIVolumeChecker(t *testing.T) {
// Create the volume in the state store
vid := "volume-id"
vol := structs.NewCSIVolume(vid)
vol := structs.NewCSIVolume(vid, index)
vol.PluginID = "foo"
vol.Namespace = structs.DefaultNamespace
vol.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter

View File

@@ -239,7 +239,8 @@ func TestServiceStack_Select_CSI(t *testing.T) {
}
// Create a volume in the state store
v := structs.NewCSIVolume("foo")
index := uint64(999)
v := structs.NewCSIVolume("foo", index)
v.Namespace = structs.DefaultNamespace
v.AccessMode = structs.CSIVolumeAccessModeMultiNodeSingleWriter
v.AttachmentMode = structs.CSIVolumeAttachmentModeFilesystem