From b588a7bd73b5c0f8607ba70b7ecd6f338e4ff9b6 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 27 Jan 2022 10:05:41 -0500 Subject: [PATCH] csi: ensure that PastClaims are populated with correct mode (#11932) In the client's `(*csiHook) Postrun()` method, we make an unpublish RPC that includes a claim in the `CSIVolumeClaimStateUnpublishing` state and using the mode from the client. But then in the `(*CSIVolume) Unpublish` RPC handler, we query the volume from the state store (because we only get an ID from the client). And when we make the client RPC for the node unpublish step, we use the _current volume's_ view of the mode. If the volume's mode has been changed before the old allocations can have their claims released, then we end up making a CSI RPC that will never succeed. Why does this code path get the mode from the volume and not the claim? Because the claim written by the GC job in `(*CoreScheduler) csiVolumeClaimGC` doesn't have a mode. Instead it just writes a claim in the unpublishing state to ensure the volumewatcher detects a "past claim" change and reaps all the claims on the volumes. Fix this by ensuring that the `CSIVolumeDenormalize` creates past claims for all nil allocations with a correct access mode set. --- nomad/state/state_store.go | 138 ++++++++++++++++++------------------- nomad/state/testing.go | 5 +- 2 files changed, 72 insertions(+), 71 deletions(-) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 7551ad045..952730168 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2194,7 +2194,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st // we return the volume with the plugins denormalized by default, // because the scheduler needs them for feasibility checking - return s.CSIVolumeDenormalizePluginsTxn(txn, vol.Copy()) + return s.csiVolumeDenormalizePluginsTxn(txn, vol.Copy()) } // CSIVolumesByPluginID looks up csi_volumes by pluginID. Caller should @@ -2326,11 +2326,11 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s } } - volume, err := s.CSIVolumeDenormalizePluginsTxn(txn, orig.Copy()) + volume, err := s.csiVolumeDenormalizePluginsTxn(txn, orig.Copy()) if err != nil { return err } - volume, err = s.CSIVolumeDenormalizeTxn(txn, nil, volume) + volume, err = s.csiVolumeDenormalizeTxn(txn, nil, volume) if err != nil { return err } @@ -2414,7 +2414,7 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s // volSafeToForce checks if the any of the remaining allocations // are in a non-terminal state. func (s *StateStore) volSafeToForce(txn Txn, v *structs.CSIVolume) bool { - vol, err := s.CSIVolumeDenormalizeTxn(txn, nil, v) + vol, err := s.csiVolumeDenormalizeTxn(txn, nil, v) if err != nil { return false } @@ -2443,15 +2443,12 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs } txn := s.db.ReadTxn() defer txn.Abort() - return s.CSIVolumeDenormalizePluginsTxn(txn, vol) + return s.csiVolumeDenormalizePluginsTxn(txn, vol) } -// CSIVolumeDenormalizePluginsTxn returns a CSIVolume with current health and -// plugins, but without allocations. -// Use this for current volume metadata, handling lists of volumes. -// Use CSIVolumeDenormalize for volumes containing both health and current -// allocations. -func (s *StateStore) CSIVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVolume) (*structs.CSIVolume, error) { +// csiVolumeDenormalizePluginsTxn implements +// CSIVolumeDenormalizePlugins, inside a transaction. +func (s *StateStore) csiVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVolume) (*structs.CSIVolume, error) { if vol == nil { return nil, nil } @@ -2484,80 +2481,83 @@ func (s *StateStore) CSIVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVol return vol, nil } -// CSIVolumeDenormalize returns a CSIVolume with allocations +// CSIVolumeDenormalize returns a CSIVolume with its current +// Allocations and Claims, including creating new PastClaims for +// terminal or garbage collected allocations. This ensures we have a +// consistent state. Note that it mutates the original volume and so +// should always be called on a Copy after reading from the state +// store. func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { txn := s.db.ReadTxn() - return s.CSIVolumeDenormalizeTxn(txn, ws, vol) + return s.csiVolumeDenormalizeTxn(txn, ws, vol) } -// CSIVolumeDenormalizeTxn populates a CSIVolume with allocations -func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { +// csiVolumeDenormalizeTxn implements CSIVolumeDenormalize inside a transaction +func (s *StateStore) csiVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { if vol == nil { return nil, nil } - for id := range vol.ReadAllocs { - a, err := s.allocByIDImpl(txn, ws, id) - if err != nil { - return nil, err - } - if a != nil { - vol.ReadAllocs[id] = a - // COMPAT(1.0): the CSIVolumeClaim fields were added - // after 0.11.1, so claims made before that may be - // missing this value. (same for WriteAlloc below) - if _, ok := vol.ReadClaims[id]; !ok { - vol.ReadClaims[id] = &structs.CSIVolumeClaim{ + + // note: denormalize mutates the maps we pass in! + denormalize := func( + currentAllocs map[string]*structs.Allocation, + currentClaims, pastClaims map[string]*structs.CSIVolumeClaim, + fallbackMode structs.CSIVolumeClaimMode) error { + + for id := range currentAllocs { + a, err := s.allocByIDImpl(txn, ws, id) + if err != nil { + return err + } + pastClaim := pastClaims[id] + currentClaim := currentClaims[id] + if currentClaim == nil { + // COMPAT(1.4.0): the CSIVolumeClaim fields were added + // after 0.11.1, so claims made before that may be + // missing this value. No clusters should see this + // anymore, so warn nosily in the logs so that + // operators ask us about it. Remove this block and + // the now-unused fallbackMode parameter, and return + // an error if currentClaim is nil in 1.4.0 + s.logger.Warn("volume was missing claim for allocation", + "volume_id", vol.ID, "alloc", id) + currentClaim = &structs.CSIVolumeClaim{ AllocationID: a.ID, NodeID: a.NodeID, - Mode: structs.CSIVolumeClaimRead, + Mode: fallbackMode, State: structs.CSIVolumeClaimStateTaken, } + currentClaims[id] = currentClaim } - } else if _, ok := vol.PastClaims[id]; !ok { - // ensure that any allocs that have been GC'd since - // our last read are marked as past claims - vol.PastClaims[id] = &structs.CSIVolumeClaim{ - AllocationID: id, - Mode: structs.CSIVolumeClaimRead, - State: structs.CSIVolumeClaimStateUnpublishing, - } - readClaim := vol.ReadClaims[id] - if readClaim != nil { - vol.PastClaims[id].NodeID = readClaim.NodeID + + currentAllocs[id] = a + if a == nil && pastClaim == nil { + // the alloc is garbage collected but nothing has written a PastClaim, + // so create one now + pastClaim = &structs.CSIVolumeClaim{ + AllocationID: id, + NodeID: currentClaim.NodeID, + Mode: currentClaim.Mode, + State: structs.CSIVolumeClaimStateUnpublishing, + AccessMode: currentClaim.AccessMode, + AttachmentMode: currentClaim.AttachmentMode, + } + pastClaims[id] = pastClaim } + } + return nil } - for id := range vol.WriteAllocs { - a, err := s.allocByIDImpl(txn, ws, id) - if err != nil { - return nil, err - } - if a != nil { - vol.WriteAllocs[id] = a - if _, ok := vol.WriteClaims[id]; !ok { - vol.WriteClaims[id] = &structs.CSIVolumeClaim{ - AllocationID: a.ID, - NodeID: a.NodeID, - Mode: structs.CSIVolumeClaimWrite, - State: structs.CSIVolumeClaimStateTaken, - } - } - } else if _, ok := vol.PastClaims[id]; !ok { - // ensure that any allocs that have been GC'd since - // our last read are marked as past claims - - vol.PastClaims[id] = &structs.CSIVolumeClaim{ - AllocationID: id, - Mode: structs.CSIVolumeClaimWrite, - State: structs.CSIVolumeClaimStateUnpublishing, - } - writeClaim := vol.WriteClaims[id] - if writeClaim != nil { - vol.PastClaims[id].NodeID = writeClaim.NodeID - } - - } + err := denormalize(vol.ReadAllocs, vol.ReadClaims, vol.PastClaims, + structs.CSIVolumeClaimRead) + if err != nil { + return nil, err + } + err = denormalize(vol.WriteAllocs, vol.WriteClaims, vol.PastClaims, + structs.CSIVolumeClaimWrite) + if err != nil { + return nil, err } // COMPAT: the AccessMode and AttachmentMode fields were added to claims diff --git a/nomad/state/testing.go b/nomad/state/testing.go index 0d570a95a..c7a2f3e8e 100644 --- a/nomad/state/testing.go +++ b/nomad/state/testing.go @@ -221,8 +221,8 @@ func TestBadCSIState(t testing.TB, store *StateStore) error { allocID2 := uuid.Generate() // nil alloc alloc1 := mock.Alloc() - alloc1.ClientStatus = "complete" - alloc1.DesiredStatus = "stop" + alloc1.ClientStatus = structs.AllocClientStatusRunning + alloc1.DesiredStatus = structs.AllocDesiredStatusRun // Insert allocs into the state store err := store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1}) @@ -303,6 +303,7 @@ func TestBadCSIState(t testing.TB, store *StateStore) error { NodesHealthy: 2, NodesExpected: 0, } + vol = vol.Copy() // canonicalize err = store.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) if err != nil {