diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 7551ad045..952730168 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2194,7 +2194,7 @@ func (s *StateStore) CSIVolumeByID(ws memdb.WatchSet, namespace, id string) (*st // we return the volume with the plugins denormalized by default, // because the scheduler needs them for feasibility checking - return s.CSIVolumeDenormalizePluginsTxn(txn, vol.Copy()) + return s.csiVolumeDenormalizePluginsTxn(txn, vol.Copy()) } // CSIVolumesByPluginID looks up csi_volumes by pluginID. Caller should @@ -2326,11 +2326,11 @@ func (s *StateStore) CSIVolumeClaim(index uint64, namespace, id string, claim *s } } - volume, err := s.CSIVolumeDenormalizePluginsTxn(txn, orig.Copy()) + volume, err := s.csiVolumeDenormalizePluginsTxn(txn, orig.Copy()) if err != nil { return err } - volume, err = s.CSIVolumeDenormalizeTxn(txn, nil, volume) + volume, err = s.csiVolumeDenormalizeTxn(txn, nil, volume) if err != nil { return err } @@ -2414,7 +2414,7 @@ func (s *StateStore) CSIVolumeDeregister(index uint64, namespace string, ids []s // volSafeToForce checks if the any of the remaining allocations // are in a non-terminal state. func (s *StateStore) volSafeToForce(txn Txn, v *structs.CSIVolume) bool { - vol, err := s.CSIVolumeDenormalizeTxn(txn, nil, v) + vol, err := s.csiVolumeDenormalizeTxn(txn, nil, v) if err != nil { return false } @@ -2443,15 +2443,12 @@ func (s *StateStore) CSIVolumeDenormalizePlugins(ws memdb.WatchSet, vol *structs } txn := s.db.ReadTxn() defer txn.Abort() - return s.CSIVolumeDenormalizePluginsTxn(txn, vol) + return s.csiVolumeDenormalizePluginsTxn(txn, vol) } -// CSIVolumeDenormalizePluginsTxn returns a CSIVolume with current health and -// plugins, but without allocations. -// Use this for current volume metadata, handling lists of volumes. -// Use CSIVolumeDenormalize for volumes containing both health and current -// allocations. -func (s *StateStore) CSIVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVolume) (*structs.CSIVolume, error) { +// csiVolumeDenormalizePluginsTxn implements +// CSIVolumeDenormalizePlugins, inside a transaction. +func (s *StateStore) csiVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVolume) (*structs.CSIVolume, error) { if vol == nil { return nil, nil } @@ -2484,80 +2481,83 @@ func (s *StateStore) CSIVolumeDenormalizePluginsTxn(txn Txn, vol *structs.CSIVol return vol, nil } -// CSIVolumeDenormalize returns a CSIVolume with allocations +// CSIVolumeDenormalize returns a CSIVolume with its current +// Allocations and Claims, including creating new PastClaims for +// terminal or garbage collected allocations. This ensures we have a +// consistent state. Note that it mutates the original volume and so +// should always be called on a Copy after reading from the state +// store. func (s *StateStore) CSIVolumeDenormalize(ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { txn := s.db.ReadTxn() - return s.CSIVolumeDenormalizeTxn(txn, ws, vol) + return s.csiVolumeDenormalizeTxn(txn, ws, vol) } -// CSIVolumeDenormalizeTxn populates a CSIVolume with allocations -func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { +// csiVolumeDenormalizeTxn implements CSIVolumeDenormalize inside a transaction +func (s *StateStore) csiVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *structs.CSIVolume) (*structs.CSIVolume, error) { if vol == nil { return nil, nil } - for id := range vol.ReadAllocs { - a, err := s.allocByIDImpl(txn, ws, id) - if err != nil { - return nil, err - } - if a != nil { - vol.ReadAllocs[id] = a - // COMPAT(1.0): the CSIVolumeClaim fields were added - // after 0.11.1, so claims made before that may be - // missing this value. (same for WriteAlloc below) - if _, ok := vol.ReadClaims[id]; !ok { - vol.ReadClaims[id] = &structs.CSIVolumeClaim{ + + // note: denormalize mutates the maps we pass in! + denormalize := func( + currentAllocs map[string]*structs.Allocation, + currentClaims, pastClaims map[string]*structs.CSIVolumeClaim, + fallbackMode structs.CSIVolumeClaimMode) error { + + for id := range currentAllocs { + a, err := s.allocByIDImpl(txn, ws, id) + if err != nil { + return err + } + pastClaim := pastClaims[id] + currentClaim := currentClaims[id] + if currentClaim == nil { + // COMPAT(1.4.0): the CSIVolumeClaim fields were added + // after 0.11.1, so claims made before that may be + // missing this value. No clusters should see this + // anymore, so warn nosily in the logs so that + // operators ask us about it. Remove this block and + // the now-unused fallbackMode parameter, and return + // an error if currentClaim is nil in 1.4.0 + s.logger.Warn("volume was missing claim for allocation", + "volume_id", vol.ID, "alloc", id) + currentClaim = &structs.CSIVolumeClaim{ AllocationID: a.ID, NodeID: a.NodeID, - Mode: structs.CSIVolumeClaimRead, + Mode: fallbackMode, State: structs.CSIVolumeClaimStateTaken, } + currentClaims[id] = currentClaim } - } else if _, ok := vol.PastClaims[id]; !ok { - // ensure that any allocs that have been GC'd since - // our last read are marked as past claims - vol.PastClaims[id] = &structs.CSIVolumeClaim{ - AllocationID: id, - Mode: structs.CSIVolumeClaimRead, - State: structs.CSIVolumeClaimStateUnpublishing, - } - readClaim := vol.ReadClaims[id] - if readClaim != nil { - vol.PastClaims[id].NodeID = readClaim.NodeID + + currentAllocs[id] = a + if a == nil && pastClaim == nil { + // the alloc is garbage collected but nothing has written a PastClaim, + // so create one now + pastClaim = &structs.CSIVolumeClaim{ + AllocationID: id, + NodeID: currentClaim.NodeID, + Mode: currentClaim.Mode, + State: structs.CSIVolumeClaimStateUnpublishing, + AccessMode: currentClaim.AccessMode, + AttachmentMode: currentClaim.AttachmentMode, + } + pastClaims[id] = pastClaim } + } + return nil } - for id := range vol.WriteAllocs { - a, err := s.allocByIDImpl(txn, ws, id) - if err != nil { - return nil, err - } - if a != nil { - vol.WriteAllocs[id] = a - if _, ok := vol.WriteClaims[id]; !ok { - vol.WriteClaims[id] = &structs.CSIVolumeClaim{ - AllocationID: a.ID, - NodeID: a.NodeID, - Mode: structs.CSIVolumeClaimWrite, - State: structs.CSIVolumeClaimStateTaken, - } - } - } else if _, ok := vol.PastClaims[id]; !ok { - // ensure that any allocs that have been GC'd since - // our last read are marked as past claims - - vol.PastClaims[id] = &structs.CSIVolumeClaim{ - AllocationID: id, - Mode: structs.CSIVolumeClaimWrite, - State: structs.CSIVolumeClaimStateUnpublishing, - } - writeClaim := vol.WriteClaims[id] - if writeClaim != nil { - vol.PastClaims[id].NodeID = writeClaim.NodeID - } - - } + err := denormalize(vol.ReadAllocs, vol.ReadClaims, vol.PastClaims, + structs.CSIVolumeClaimRead) + if err != nil { + return nil, err + } + err = denormalize(vol.WriteAllocs, vol.WriteClaims, vol.PastClaims, + structs.CSIVolumeClaimWrite) + if err != nil { + return nil, err } // COMPAT: the AccessMode and AttachmentMode fields were added to claims diff --git a/nomad/state/testing.go b/nomad/state/testing.go index 0d570a95a..c7a2f3e8e 100644 --- a/nomad/state/testing.go +++ b/nomad/state/testing.go @@ -221,8 +221,8 @@ func TestBadCSIState(t testing.TB, store *StateStore) error { allocID2 := uuid.Generate() // nil alloc alloc1 := mock.Alloc() - alloc1.ClientStatus = "complete" - alloc1.DesiredStatus = "stop" + alloc1.ClientStatus = structs.AllocClientStatusRunning + alloc1.DesiredStatus = structs.AllocDesiredStatusRun // Insert allocs into the state store err := store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1}) @@ -303,6 +303,7 @@ func TestBadCSIState(t testing.TB, store *StateStore) error { NodesHealthy: 2, NodesExpected: 0, } + vol = vol.Copy() // canonicalize err = store.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) if err != nil {