diff --git a/.changelog/25428.txt b/.changelog/25428.txt new file mode 100644 index 000000000..f9edd44b7 --- /dev/null +++ b/.changelog/25428.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Fixed a bug where cleaning up volume claims on GC'd nodes would cause errors on the leader +``` diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index 44ca7479c..1370f3463 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -537,7 +537,7 @@ func (v *CSIVolume) controllerPublishVolume(req *structs.CSIVolumeClaimRequest, return err } if targetNode == nil { - return fmt.Errorf("%s: %s", structs.ErrUnknownNodePrefix, alloc.NodeID) + return fmt.Errorf("%w %s", structs.ErrUnknownNode, alloc.NodeID) } // if the RPC is sent by a client node, it may not know the claim's @@ -925,6 +925,14 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str if claim.ExternalNodeID == "" { externalNodeID, err := v.lookupExternalNodeID(vol, claim) if err != nil { + // if the node has been GC'd, there's no path for us to ever send + // the controller detach, so assume the node is gone + if errors.Is(err, structs.ErrUnknownNode) { + v.logger.Trace("controller detach skipped for missing node", "vol", vol.ID) + claim.State = structs.CSIVolumeClaimStateReadyToFree + return v.checkpointClaim(vol, claim) + } + return fmt.Errorf("missing external node ID: %v", err) } claim.ExternalNodeID = externalNodeID @@ -978,7 +986,7 @@ func (v *CSIVolume) lookupExternalNodeID(vol *structs.CSIVolume, claim *structs. return "", err } if targetNode == nil { - return "", fmt.Errorf("%s: %s", structs.ErrUnknownNodePrefix, claim.NodeID) + return "", fmt.Errorf("%w %s", structs.ErrUnknownNode, claim.NodeID) } // get the storage provider's ID for the client node (not diff --git a/nomad/csi_endpoint_test.go b/nomad/csi_endpoint_test.go index f28fac011..971d403c3 100644 --- a/nomad/csi_endpoint_test.go +++ b/nomad/csi_endpoint_test.go @@ -615,20 +615,23 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { endState structs.CSIVolumeClaimState nodeID string otherNodeID string + externalNodeID string expectedErrMsg string } testCases := []tc{ { - name: "success", - startingState: structs.CSIVolumeClaimStateControllerDetached, - nodeID: node.ID, - otherNodeID: uuid.Generate(), + name: "success", + startingState: structs.CSIVolumeClaimStateControllerDetached, + nodeID: node.ID, + otherNodeID: uuid.Generate(), + externalNodeID: "i-example", }, { - name: "non-terminal allocation on same node", - startingState: structs.CSIVolumeClaimStateNodeDetached, - nodeID: node.ID, - otherNodeID: node.ID, + name: "non-terminal allocation on same node", + startingState: structs.CSIVolumeClaimStateNodeDetached, + nodeID: node.ID, + otherNodeID: node.ID, + externalNodeID: "i-example", }, { name: "unpublish previously detached node", @@ -637,6 +640,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { expectedErrMsg: "could not detach from controller: controller detach volume: No path to node", nodeID: node.ID, otherNodeID: uuid.Generate(), + externalNodeID: "i-example", }, { name: "unpublish claim on garbage collected node", @@ -645,6 +649,15 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { expectedErrMsg: "could not detach from controller: controller detach volume: No path to node", nodeID: uuid.Generate(), otherNodeID: uuid.Generate(), + externalNodeID: "i-example", + }, + { + name: "unpublish claim on garbage collected node missing external ID", + startingState: structs.CSIVolumeClaimStateTaken, + endState: structs.CSIVolumeClaimStateNodeDetached, + nodeID: uuid.Generate(), + otherNodeID: uuid.Generate(), + externalNodeID: "", }, { name: "first unpublish", @@ -653,6 +666,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { expectedErrMsg: "could not detach from controller: controller detach volume: No path to node", nodeID: node.ID, otherNodeID: uuid.Generate(), + externalNodeID: "i-example", }, } @@ -693,7 +707,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { claim := &structs.CSIVolumeClaim{ AllocationID: alloc.ID, NodeID: tc.nodeID, - ExternalNodeID: "i-example", + ExternalNodeID: tc.externalNodeID, Mode: structs.CSIVolumeClaimRead, } @@ -708,7 +722,7 @@ func TestCSIVolumeEndpoint_Unpublish(t *testing.T) { otherClaim := &structs.CSIVolumeClaim{ AllocationID: otherAlloc.ID, NodeID: tc.otherNodeID, - ExternalNodeID: "i-example", + ExternalNodeID: tc.externalNodeID, Mode: structs.CSIVolumeClaimRead, } diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index bf1d9f768..f53924155 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -3021,6 +3021,7 @@ func (s *StateStore) csiVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *st // so create one now pastClaim = &structs.CSIVolumeClaim{ AllocationID: id, + ExternalNodeID: currentClaim.ExternalNodeID, NodeID: currentClaim.NodeID, Mode: currentClaim.Mode, State: structs.CSIVolumeClaimStateUnpublishing, diff --git a/nomad/volumewatcher/volume_watcher.go b/nomad/volumewatcher/volume_watcher.go index efa29505f..114aa7aa3 100644 --- a/nomad/volumewatcher/volume_watcher.go +++ b/nomad/volumewatcher/volume_watcher.go @@ -14,6 +14,7 @@ import ( "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" + "golang.org/x/time/rate" ) // volumeWatcher is used to watch a single volume and trigger the @@ -41,6 +42,9 @@ type volumeWatcher struct { // before stopping the child watcher goroutines quiescentTimeout time.Duration + // limiter slows the rate at which we can attempt to clean up a given volume + limiter *rate.Limiter + // updateCh is triggered when there is an updated volume updateCh chan *structs.CSIVolume @@ -62,6 +66,7 @@ func newVolumeWatcher(parent *Watcher, vol *structs.CSIVolume) *volumeWatcher { shutdownCtx: parent.ctx, deleteFn: func() { parent.remove(vol.ID + vol.Namespace) }, quiescentTimeout: parent.quiescentTimeout, + limiter: rate.NewLimiter(rate.Limit(1), 3), } // Start the long lived watcher that scans for allocation updates @@ -76,7 +81,7 @@ func (vw *volumeWatcher) Notify(v *structs.CSIVolume) { } select { case vw.updateCh <- v: - case <-vw.shutdownCtx.Done(): // prevent deadlock if we stopped + default: // don't block on full notification channel } } @@ -123,6 +128,10 @@ func (vw *volumeWatcher) watch() { case <-vw.shutdownCtx.Done(): return case vol := <-vw.updateCh: + err := vw.limiter.Wait(vw.shutdownCtx) + if err != nil { + return // only returns error on context close + } vol = vw.getVolume(vol) if vol == nil { return @@ -193,44 +202,6 @@ func (vw *volumeWatcher) volumeReapImpl(vol *structs.CSIVolume) error { return result.ErrorOrNil() } -func (vw *volumeWatcher) collectPastClaims(vol *structs.CSIVolume) *structs.CSIVolume { - - collect := func(allocs map[string]*structs.Allocation, - claims map[string]*structs.CSIVolumeClaim) { - - for allocID, alloc := range allocs { - if alloc == nil { - _, exists := vol.PastClaims[allocID] - if !exists { - vol.PastClaims[allocID] = &structs.CSIVolumeClaim{ - AllocationID: allocID, - State: structs.CSIVolumeClaimStateReadyToFree, - } - } - } else if alloc.Terminated() { - // don't overwrite the PastClaim if we've seen it before, - // so that we can track state between subsequent calls - _, exists := vol.PastClaims[allocID] - if !exists { - claim, ok := claims[allocID] - if !ok { - claim = &structs.CSIVolumeClaim{ - AllocationID: allocID, - NodeID: alloc.NodeID, - } - } - claim.State = structs.CSIVolumeClaimStateTaken - vol.PastClaims[allocID] = claim - } - } - } - } - - collect(vol.ReadAllocs, vol.ReadClaims) - collect(vol.WriteAllocs, vol.WriteClaims) - return vol -} - func (vw *volumeWatcher) unpublish(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { vw.logger.Trace("unpublishing volume", "alloc", claim.AllocationID) req := &structs.CSIVolumeUnpublishRequest{