csi: ensure that PastClaims are populated with correct mode (#11932)

In the client's `(*csiHook) Postrun()` method, we make an unpublish
RPC that includes a claim in the `CSIVolumeClaimStateUnpublishing`
state and using the mode from the client. But then in the
`(*CSIVolume) Unpublish` RPC handler, we query the volume from the
state store (because we only get an ID from the client). And when we
make the client RPC for the node unpublish step, we use the _current
volume's_ view of the mode. If the volume's mode has been changed
before the old allocations can have their claims released, then we end
up making a CSI RPC that will never succeed.

Why does this code path get the mode from the volume and not the
claim? Because the claim written by the GC job in `(*CoreScheduler)
csiVolumeClaimGC` doesn't have a mode. Instead it just writes a claim
in the unpublishing state to ensure the volumewatcher detects a "past
claim" change and reaps all the claims on the volumes.

Fix this by ensuring that the `CSIVolumeDenormalize` creates past
claims for all nil allocations with a correct access mode set.
This commit is contained in:
Tim Gross
2022-01-27 10:05:41 -05:00
committed by GitHub
parent d0624fc09f
commit b588a7bd73
2 changed files with 72 additions and 71 deletions

View File

@@ -2194,7 +2194,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st
// we return the volume with the plugins denormalized by default,
// because the scheduler needs them for feasibility checking
return s.CSIVolumeDenormalizePluginsTxn(txn, vol.Copy())
return s.csiVolumeDenormalizePluginsTxn(txn, vol.Copy())
}
// CSIVolumesByPluginID looks up csi_volumes by pluginID. Caller should
@@ -2326,11 +2326,11 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s
}
}
volume, err := s.CSIVolumeDenormalizePluginsTxn(txn, orig.Copy())
volume, err := s.csiVolumeDenormalizePluginsTxn(txn, orig.Copy())
if err != nil {
return err
}
volume, err = s.CSIVolumeDenormalizeTxn(txn, nil, volume)
volume, err = s.csiVolumeDenormalizeTxn(txn, nil, volume)
if err != nil {
return err
}
@@ -2414,7 +2414,7 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s
// volSafeToForce checks if the any of the remaining allocations
// are in a non-terminal state.
func (s *StateStore) volSafeToForce(txn Txn, v *structs.CSIVolume) bool {
vol, err := s.CSIVolumeDenormalizeTxn(txn, nil, v)
vol, err := s.csiVolumeDenormalizeTxn(txn, nil, v)
if err != nil {
return false
}
@@ -2443,15 +2443,12 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs
}
txn := s.db.ReadTxn()
defer txn.Abort()
return s.CSIVolumeDenormalizePluginsTxn(txn, vol)
return s.csiVolumeDenormalizePluginsTxn(txn, vol)
}
// CSIVolumeDenormalizePluginsTxn returns a CSIVolume with current health and
// plugins, but without allocations.
// Use this for current volume metadata, handling lists of volumes.
// Use CSIVolumeDenormalize for volumes containing both health and current
// allocations.
func (s *StateStore) CSIVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
// csiVolumeDenormalizePluginsTxn implements
// CSIVolumeDenormalizePlugins, inside a transaction.
func (s *StateStore) csiVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
if vol == nil {
return nil, nil
}
@@ -2484,80 +2481,83 @@ func (s *StateStore) CSIVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVol
return vol, nil
}
// CSIVolumeDenormalize returns a CSIVolume with allocations
// CSIVolumeDenormalize returns a CSIVolume with its current
// Allocations and Claims, including creating new PastClaims for
// terminal or garbage collected allocations. This ensures we have a
// consistent state. Note that it mutates the original volume and so
// should always be called on a Copy after reading from the state
// store.
func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
txn := s.db.ReadTxn()
return s.CSIVolumeDenormalizeTxn(txn, ws, vol)
return s.csiVolumeDenormalizeTxn(txn, ws, vol)
}
// CSIVolumeDenormalizeTxn populates a CSIVolume with allocations
func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
// csiVolumeDenormalizeTxn implements CSIVolumeDenormalize inside a transaction
func (s *StateStore) csiVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) {
if vol == nil {
return nil, nil
}
for id := range vol.ReadAllocs {
a, err := s.allocByIDImpl(txn, ws, id)
if err != nil {
return nil, err
}
if a != nil {
vol.ReadAllocs[id] = a
// COMPAT(1.0): the CSIVolumeClaim fields were added
// after 0.11.1, so claims made before that may be
// missing this value. (same for WriteAlloc below)
if _, ok := vol.ReadClaims[id]; !ok {
vol.ReadClaims[id] = &structs.CSIVolumeClaim{
// note: denormalize mutates the maps we pass in!
denormalize := func(
currentAllocs map[string]*structs.Allocation,
currentClaims, pastClaims map[string]*structs.CSIVolumeClaim,
fallbackMode structs.CSIVolumeClaimMode) error {
for id := range currentAllocs {
a, err := s.allocByIDImpl(txn, ws, id)
if err != nil {
return err
}
pastClaim := pastClaims[id]
currentClaim := currentClaims[id]
if currentClaim == nil {
// COMPAT(1.4.0): the CSIVolumeClaim fields were added
// after 0.11.1, so claims made before that may be
// missing this value. No clusters should see this
// anymore, so warn nosily in the logs so that
// operators ask us about it. Remove this block and
// the now-unused fallbackMode parameter, and return
// an error if currentClaim is nil in 1.4.0
s.logger.Warn("volume was missing claim for allocation",
"volume_id", vol.ID, "alloc", id)
currentClaim = &structs.CSIVolumeClaim{
AllocationID: a.ID,
NodeID: a.NodeID,
Mode: structs.CSIVolumeClaimRead,
Mode: fallbackMode,
State: structs.CSIVolumeClaimStateTaken,
}
currentClaims[id] = currentClaim
}
} else if _, ok := vol.PastClaims[id]; !ok {
// ensure that any allocs that have been GC'd since
// our last read are marked as past claims
vol.PastClaims[id] = &structs.CSIVolumeClaim{
AllocationID: id,
Mode: structs.CSIVolumeClaimRead,
State: structs.CSIVolumeClaimStateUnpublishing,
}
readClaim := vol.ReadClaims[id]
if readClaim != nil {
vol.PastClaims[id].NodeID = readClaim.NodeID
currentAllocs[id] = a
if a == nil && pastClaim == nil {
// the alloc is garbage collected but nothing has written a PastClaim,
// so create one now
pastClaim = &structs.CSIVolumeClaim{
AllocationID: id,
NodeID: currentClaim.NodeID,
Mode: currentClaim.Mode,
State: structs.CSIVolumeClaimStateUnpublishing,
AccessMode: currentClaim.AccessMode,
AttachmentMode: currentClaim.AttachmentMode,
}
pastClaims[id] = pastClaim
}
}
return nil
}
for id := range vol.WriteAllocs {
a, err := s.allocByIDImpl(txn, ws, id)
if err != nil {
return nil, err
}
if a != nil {
vol.WriteAllocs[id] = a
if _, ok := vol.WriteClaims[id]; !ok {
vol.WriteClaims[id] = &structs.CSIVolumeClaim{
AllocationID: a.ID,
NodeID: a.NodeID,
Mode: structs.CSIVolumeClaimWrite,
State: structs.CSIVolumeClaimStateTaken,
}
}
} else if _, ok := vol.PastClaims[id]; !ok {
// ensure that any allocs that have been GC'd since
// our last read are marked as past claims
vol.PastClaims[id] = &structs.CSIVolumeClaim{
AllocationID: id,
Mode: structs.CSIVolumeClaimWrite,
State: structs.CSIVolumeClaimStateUnpublishing,
}
writeClaim := vol.WriteClaims[id]
if writeClaim != nil {
vol.PastClaims[id].NodeID = writeClaim.NodeID
}
}
err := denormalize(vol.ReadAllocs, vol.ReadClaims, vol.PastClaims,
structs.CSIVolumeClaimRead)
if err != nil {
return nil, err
}
err = denormalize(vol.WriteAllocs, vol.WriteClaims, vol.PastClaims,
structs.CSIVolumeClaimWrite)
if err != nil {
return nil, err
}
// COMPAT: the AccessMode and AttachmentMode fields were added to claims

View File

@@ -221,8 +221,8 @@ func TestBadCSIState(t testing.TB, store *StateStore) error {
allocID2 := uuid.Generate() // nil alloc
alloc1 := mock.Alloc()
alloc1.ClientStatus = "complete"
alloc1.DesiredStatus = "stop"
alloc1.ClientStatus = structs.AllocClientStatusRunning
alloc1.DesiredStatus = structs.AllocDesiredStatusRun
// Insert allocs into the state store
err := store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1})
@@ -303,6 +303,7 @@ func TestBadCSIState(t testing.TB, store *StateStore) error {
NodesHealthy: 2,
NodesExpected: 0,
}
vol = vol.Copy() // canonicalize
err = store.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
if err != nil {