mirror of
https://github.com/kemko/nomad.git
synced 2026-01-07 02:45:42 +03:00
csi: volume deregister fails for volumes actively in use (#7445)
* nomad/structs/csi: add InUse to CSIVolume * nomad/state/state_store: block volume deregistration for in use vols
This commit is contained in:
@@ -1833,6 +1833,15 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s
|
||||
return fmt.Errorf("volume not found: %s", id)
|
||||
}
|
||||
|
||||
vol, ok := existing.(*structs.CSIVolume)
|
||||
if !ok {
|
||||
return fmt.Errorf("volume row conversion error: %s", id)
|
||||
}
|
||||
|
||||
if vol.InUse() {
|
||||
return fmt.Errorf("volume in use: %s", id)
|
||||
}
|
||||
|
||||
if err = txn.Delete("csi_volumes", existing); err != nil {
|
||||
return fmt.Errorf("volume delete failed: %s: %v", id, err)
|
||||
}
|
||||
|
||||
@@ -2928,24 +2928,6 @@ func TestStateStore_CSIVolume(t *testing.T) {
|
||||
vs = slurp(iter)
|
||||
require.Equal(t, 1, len(vs))
|
||||
|
||||
index++
|
||||
err = state.CSIVolumeDeregister(index, ns, []string{
|
||||
vol1,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
iter, err = state.CSIVolumesByPluginID(ws, ns, "adam")
|
||||
require.NoError(t, err)
|
||||
vs = slurp(iter)
|
||||
require.Equal(t, 0, len(vs))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
iter, err = state.CSIVolumesByNamespace(ws, ns)
|
||||
require.NoError(t, err)
|
||||
vs = slurp(iter)
|
||||
require.Equal(t, 1, len(vs))
|
||||
|
||||
// Claims
|
||||
a0 := &structs.Allocation{ID: uuid.Generate()}
|
||||
a1 := &structs.Allocation{ID: uuid.Generate()}
|
||||
@@ -2973,6 +2955,36 @@ func TestStateStore_CSIVolume(t *testing.T) {
|
||||
require.NoError(t, err)
|
||||
vs = slurp(iter)
|
||||
require.True(t, vs[0].ReadSchedulable())
|
||||
|
||||
// Deregister
|
||||
index++
|
||||
err = state.CSIVolumeDeregister(index, ns, []string{vol0})
|
||||
require.Error(t, err, fmt.Sprintf("volume in use: %s", vol0))
|
||||
|
||||
// release claims to unblock deregister
|
||||
index++
|
||||
err = state.CSIVolumeClaim(index, ns, vol0, a0, u)
|
||||
require.NoError(t, err)
|
||||
index++
|
||||
err = state.CSIVolumeClaim(index, ns, vol0, a1, u)
|
||||
require.NoError(t, err)
|
||||
|
||||
index++
|
||||
err = state.CSIVolumeDeregister(index, ns, []string{vol0})
|
||||
require.NoError(t, err)
|
||||
|
||||
// List, now omitting the deregistered volume
|
||||
ws = memdb.NewWatchSet()
|
||||
iter, err = state.CSIVolumesByPluginID(ws, ns, "minnie")
|
||||
require.NoError(t, err)
|
||||
vs = slurp(iter)
|
||||
require.Equal(t, 0, len(vs))
|
||||
|
||||
ws = memdb.NewWatchSet()
|
||||
iter, err = state.CSIVolumesByNamespace(ws, ns)
|
||||
require.NoError(t, err)
|
||||
vs = slurp(iter)
|
||||
require.Equal(t, 1, len(vs))
|
||||
}
|
||||
|
||||
// TestStateStore_CSIPluginNodes uses the state from jobs, and uses node fingerprinting to update health
|
||||
|
||||
@@ -330,6 +330,12 @@ func (v *CSIVolume) WriteFreeClaims() bool {
|
||||
}
|
||||
}
|
||||
|
||||
// InUse tests whether any allocations are actively using the volume
|
||||
func (v *CSIVolume) InUse() bool {
|
||||
return len(v.ReadAllocs) != 0 ||
|
||||
len(v.WriteAllocs) != 0
|
||||
}
|
||||
|
||||
// Copy returns a copy of the volume, which shares only the Topologies slice
|
||||
func (v *CSIVolume) Copy() *CSIVolume {
|
||||
copy := *v
|
||||
|
||||
Reference in New Issue
Block a user