From f718c132b4c7a3687277ca4c46f638b0640db2ee Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 4 Apr 2022 10:46:45 -0400 Subject: [PATCH] CSI: volume watcher shutdown fixes (#12439) The volume watcher design was based on deploymentwatcher and drainer, but has an important difference: we don't want to maintain a goroutine for the lifetime of the volume. So we stop the volumewatcher goroutine for a volume when that volume has no more claims to free. But the shutdown races with updates on the parent goroutine, and it's possible to drop updates. Fortunately these updates are picked up on the next core GC job, but we're most likely to hit this race when we're replacing an allocation and that's the time we least want to wait. Wait until the volume has "settled" before stopping this goroutine so that the race between shutdown and the parent goroutine sending on `<-updateCh` is pushed to after the window we most care about quick freeing of claims. * Fixes a resource leak when volumewatchers are no longer needed. The volume is nil and can't ever be started again, so the volume's `watcher` should be removed from the top-level `Watcher`. * De-flakes the GC job test: the test throws an error because the claimed node doesn't exist and is unreachable. This flaked instead of failed because we didn't correctly wait for the first pass through the volumewatcher. Make the GC job wait for the volumewatcher to reach the quiescent timeout window state before running the GC eval under test, so that we're sure the GC job's work isn't being picked up by processing one of the earlier claims. Update the claims used so that we're sure the GC pass won't hit a node unpublish error. * Adds trace logging to unpublish operations --- nomad/core_sched_test.go | 71 ++++++++++++++------- nomad/csi_endpoint.go | 4 +- nomad/volumewatcher/volume_watcher.go | 68 +++++++++++++------- nomad/volumewatcher/volumes_watcher.go | 29 +++++++-- nomad/volumewatcher/volumes_watcher_test.go | 15 ++++- 5 files changed, 132 insertions(+), 55 deletions(-) diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index e5f9a8d83..7441ce466 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2278,10 +2278,10 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { err := store.UpsertNode(structs.MsgTypeTestSetup, index, node) require.NoError(t, err) - // Note that for volume writes in this test we need to use the - // RPCs rather than StateStore methods directly so that the GC - // job's RPC call updates a later index. otherwise the - // volumewatcher won't trigger for the final GC + // *Important*: for volume writes in this test we must use RPCs + // rather than StateStore methods directly, or the blocking query + // in volumewatcher won't get the final update for GC because it's + // watching on a different store at that point // Register a volume vols := []*structs.CSIVolume{{ @@ -2302,11 +2302,11 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { volReq, &structs.CSIVolumeRegisterResponse{}) require.NoError(t, err) - // Create a job with two allocations that claim the volume. + // Create a job with two allocs that claim the volume. // We use two allocs here, one of which is not running, so - // that we can assert that the volumewatcher has made one - // complete pass (and removed the 2nd alloc) before running - // the GC. + // that we can assert the volumewatcher has made one + // complete pass (and removed the 2nd alloc) before we + // run the GC eval := mock.Eval() eval.Status = structs.EvalStatusFailed index++ @@ -2331,6 +2331,7 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { alloc2.NodeID = node.ID alloc2.ClientStatus = structs.AllocClientStatusComplete + alloc2.DesiredStatus = structs.AllocDesiredStatusStop alloc2.Job = job alloc2.JobID = job.ID alloc2.EvalID = eval.ID @@ -2344,42 +2345,66 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { index++ require.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1, alloc2})) - // Claim the volume for the alloc req := &structs.CSIVolumeClaimRequest{ - AllocationID: alloc1.ID, - NodeID: node.ID, - VolumeID: volID, - Claim: structs.CSIVolumeClaimWrite, + VolumeID: volID, + AllocationID: alloc1.ID, + NodeID: uuid.Generate(), // doesn't exist so we don't get errors trying to unmount volumes from it + Claim: structs.CSIVolumeClaimWrite, + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + State: structs.CSIVolumeClaimStateTaken, + WriteRequest: structs.WriteRequest{ + Namespace: ns, + Region: srv.config.Region, + }, } - req.Namespace = ns - req.Region = srv.config.Region err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{}) - require.NoError(t, err) + require.NoError(t, err, "write claim should succeed") - // Delete allocation and job + req.AllocationID = alloc2.ID + req.State = structs.CSIVolumeClaimStateUnpublishing + + err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", + req, &structs.CSIVolumeClaimResponse{}) + require.NoError(t, err, "unpublishing claim should succeed") + + require.Eventually(t, func() bool { + vol, err := store.CSIVolumeByID(ws, ns, volID) + require.NoError(t, err) + return len(vol.WriteClaims) == 1 && + len(vol.WriteAllocs) == 1 && + len(vol.PastClaims) == 0 + }, time.Second*1, 100*time.Millisecond, + "volumewatcher should have released unpublishing claim without GC") + + // At this point we can guarantee that volumewatcher is waiting + // for new work. Delete allocation and job so that the next pass + // thru volumewatcher has more work to do + index, _ = store.LatestIndex() index++ err = store.DeleteJob(index, ns, job.ID) require.NoError(t, err) + index, _ = store.LatestIndex() index++ - err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID, alloc2.ID}) + err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID}) require.NoError(t, err) // Create a core scheduler and attempt the volume claim GC snap, err := store.Snapshot() require.NoError(t, err) + core := NewCoreScheduler(srv, snap) + index, _ = snap.LatestIndex() index++ gc := srv.coreJobEval(structs.CoreJobForceGC, index) c := core.(*CoreScheduler) require.NoError(t, c.csiVolumeClaimGC(gc)) - // sending the GC claim will trigger the volumewatcher's normal - // code path. the volumewatcher will hit an error here because - // there's no path to the node, but this is a node-only plugin so - // we accept that the node has been GC'd and there's no point - // holding onto the claim + // the only remaining claim is for a deleted alloc with no path to + // the non-existent node, so volumewatcher will release the + // remaining claim require.Eventually(t, func() bool { vol, _ := store.CSIVolumeByID(ws, ns, volID) return len(vol.WriteClaims) == 0 && diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index e0f364432..e10dfbef9 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -671,6 +671,7 @@ NODE_DETACHED: } RELEASE_CLAIM: + v.logger.Trace("releasing claim", "vol", vol.ID) // advance a CSIVolumeClaimStateControllerDetached claim claim.State = structs.CSIVolumeClaimStateReadyToFree err = v.checkpointClaim(vol, claim) @@ -684,6 +685,7 @@ RELEASE_CLAIM: } func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { + v.logger.Trace("node unpublish", "vol", vol.ID) if claim.AllocationID != "" { err := v.nodeUnpublishVolumeImpl(vol, claim) if err != nil { @@ -757,7 +759,7 @@ func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *struc } func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { - + v.logger.Trace("controller unpublish", "vol", vol.ID) if !vol.ControllerRequired { claim.State = structs.CSIVolumeClaimStateReadyToFree return nil diff --git a/nomad/volumewatcher/volume_watcher.go b/nomad/volumewatcher/volume_watcher.go index fe69bca41..82cd76fb8 100644 --- a/nomad/volumewatcher/volume_watcher.go +++ b/nomad/volumewatcher/volume_watcher.go @@ -3,10 +3,12 @@ package volumewatcher import ( "context" "sync" + "time" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -30,6 +32,11 @@ type volumeWatcher struct { shutdownCtx context.Context // parent context ctx context.Context // own context exitFn context.CancelFunc + deleteFn func() + + // quiescentTimeout is the time we wait until the volume has "settled" + // before stopping the child watcher goroutines + quiescentTimeout time.Duration // updateCh is triggered when there is an updated volume updateCh chan *structs.CSIVolume @@ -43,13 +50,15 @@ type volumeWatcher struct { func newVolumeWatcher(parent *Watcher, vol *structs.CSIVolume) *volumeWatcher { w := &volumeWatcher{ - updateCh: make(chan *structs.CSIVolume, 1), - v: vol, - state: parent.state, - rpc: parent.rpc, - leaderAcl: parent.leaderAcl, - logger: parent.logger.With("volume_id", vol.ID, "namespace", vol.Namespace), - shutdownCtx: parent.ctx, + updateCh: make(chan *structs.CSIVolume, 1), + v: vol, + state: parent.state, + rpc: parent.rpc, + leaderAcl: parent.leaderAcl, + logger: parent.logger.With("volume_id", vol.ID, "namespace", vol.Namespace), + shutdownCtx: parent.ctx, + deleteFn: func() { parent.remove(vol.ID + vol.Namespace) }, + quiescentTimeout: parent.quiescentTimeout, } // Start the long lived watcher that scans for allocation updates @@ -111,6 +120,9 @@ func (vw *volumeWatcher) watch() { vol := vw.getVolume(vw.v) vw.volumeReap(vol) + timer, stop := helper.NewSafeTimer(vw.quiescentTimeout) + defer stop() + for { select { // TODO(tgross): currently server->client RPC have no cancellation @@ -120,17 +132,28 @@ func (vw *volumeWatcher) watch() { case <-vw.ctx.Done(): return case vol := <-vw.updateCh: - // while we won't make raft writes if we get a stale update, - // we can still fire extra CSI RPC calls if we don't check this - if vol.ModifyIndex >= vw.v.ModifyIndex { - vol = vw.getVolume(vol) - if vol == nil { - return - } - vw.volumeReap(vol) + vol = vw.getVolume(vol) + if vol == nil { + // We stop the goroutine whenever we have no more + // work, but only delete the watcher when the volume + // is gone to avoid racing the blocking query + vw.deleteFn() + vw.Stop() + return } - default: - vw.Stop() // no pending work + vw.volumeReap(vol) + timer.Reset(vw.quiescentTimeout) + case <-timer.C: + // Wait until the volume has "settled" before stopping + // this goroutine so that the race between shutdown and + // the parent goroutine sending on <-updateCh is pushed to + // after the window we most care about quick freeing of + // claims (and the GC job will clean up anything we miss) + vol = vw.getVolume(vol) + if vol == nil { + vw.deleteFn() + } + vw.Stop() return } } @@ -145,9 +168,12 @@ func (vw *volumeWatcher) getVolume(vol *structs.CSIVolume) *structs.CSIVolume { var err error ws := memdb.NewWatchSet() - vol, err = vw.state.CSIVolumeDenormalizePlugins(ws, vol.Copy()) + vol, err = vw.state.CSIVolumeByID(ws, vol.Namespace, vol.ID) if err != nil { - vw.logger.Error("could not query plugins for volume", "error", err) + vw.logger.Error("could not query for volume", "error", err) + return nil + } + if vol == nil { return nil } @@ -168,9 +194,6 @@ func (vw *volumeWatcher) volumeReap(vol *structs.CSIVolume) { if err != nil { vw.logger.Error("error releasing volume claims", "error", err) } - if vw.isUnclaimed(vol) { - vw.Stop() - } } func (vw *volumeWatcher) isUnclaimed(vol *structs.CSIVolume) bool { @@ -230,6 +253,7 @@ func (vw *volumeWatcher) collectPastClaims(vol *structs.CSIVolume) *structs.CSIV } func (vw *volumeWatcher) unpublish(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { + vw.logger.Trace("unpublishing volume", "alloc", claim.AllocationID) req := &structs.CSIVolumeUnpublishRequest{ VolumeID: vol.ID, Claim: claim, diff --git a/nomad/volumewatcher/volumes_watcher.go b/nomad/volumewatcher/volumes_watcher.go index c8c687add..5ae08e1ff 100644 --- a/nomad/volumewatcher/volumes_watcher.go +++ b/nomad/volumewatcher/volumes_watcher.go @@ -3,6 +3,7 @@ package volumewatcher import ( "context" "sync" + "time" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" @@ -34,9 +35,15 @@ type Watcher struct { ctx context.Context exitFn context.CancelFunc + // quiescentTimeout is the time we wait until the volume has "settled" + // before stopping the child watcher goroutines + quiescentTimeout time.Duration + wlock sync.RWMutex } +var defaultQuiescentTimeout = time.Minute * 5 + // NewVolumesWatcher returns a volumes watcher that is used to watch // volumes and trigger the scheduler as needed. func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *Watcher { @@ -47,11 +54,12 @@ func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *W ctx, exitFn := context.WithCancel(context.Background()) return &Watcher{ - rpc: rpc, - logger: logger.Named("volumes_watcher"), - ctx: ctx, - exitFn: exitFn, - leaderAcl: leaderAcl, + rpc: rpc, + logger: logger.Named("volumes_watcher"), + ctx: ctx, + exitFn: exitFn, + leaderAcl: leaderAcl, + quiescentTimeout: defaultQuiescentTimeout, } } @@ -157,10 +165,10 @@ func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, state *state.StateStore) (in } // add adds a volume to the watch list -func (w *Watcher) add(d *structs.CSIVolume) error { +func (w *Watcher) add(v *structs.CSIVolume) error { w.wlock.Lock() defer w.wlock.Unlock() - _, err := w.addLocked(d) + _, err := w.addLocked(v) return err } @@ -182,3 +190,10 @@ func (w *Watcher) addLocked(v *structs.CSIVolume) (*volumeWatcher, error) { w.watchers[v.ID+v.Namespace] = watcher return watcher, nil } + +// removes a volume from the watch list +func (w *Watcher) remove(volID string) { + w.wlock.Lock() + defer w.wlock.Unlock() + delete(w.watchers, volID) +} diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 41da11f82..97a83a9b5 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -23,18 +23,22 @@ func TestVolumeWatch_EnableDisable(t *testing.T) { index := uint64(100) watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") + watcher.quiescentTimeout = 100 * time.Millisecond watcher.SetEnabled(true, srv.State(), "") plugin := mock.CSIPlugin() node := testNode(plugin, srv.State()) alloc := mock.Alloc() alloc.ClientStatus = structs.AllocClientStatusComplete + vol := testVolume(plugin, alloc, node.ID) index++ err := srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol}) require.NoError(t, err) + // need to have just enough of a volume and claim in place so that + // the watcher doesn't immediately stop and unload itself claim := &structs.CSIVolumeClaim{ Mode: structs.CSIVolumeClaimGC, State: structs.CSIVolumeClaimStateNodeDetached, @@ -62,6 +66,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { index := uint64(100) watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") + watcher.quiescentTimeout = 100 * time.Millisecond plugin := mock.CSIPlugin() node := testNode(plugin, srv.State()) @@ -120,12 +125,12 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { // create a new watcher and enable it to simulate the leadership // transition watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "") + watcher.quiescentTimeout = 100 * time.Millisecond watcher.SetEnabled(true, srv.State(), "") require.Eventually(t, func() bool { watcher.wlock.RLock() defer watcher.wlock.RUnlock() - return 1 == len(watcher.watchers) && !watcher.watchers[vol.ID+vol.Namespace].isRunning() }, time.Second, 10*time.Millisecond) @@ -144,6 +149,7 @@ func TestVolumeWatch_StartStop(t *testing.T) { srv.state = state.TestStateStore(t) index := uint64(100) watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") + watcher.quiescentTimeout = 100 * time.Millisecond watcher.SetEnabled(true, srv.State(), "") require.Equal(t, 0, len(watcher.watchers)) @@ -240,6 +246,7 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) { index := uint64(100) watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") + watcher.quiescentTimeout = 10 * time.Millisecond watcher.SetEnabled(true, srv.State(), "") require.Equal(t, 0, len(watcher.watchers)) @@ -261,5 +268,9 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) { return 1 == len(watcher.watchers) }, time.Second, 10*time.Millisecond) - require.False(t, watcher.watchers[vol.ID+vol.Namespace].isRunning()) + require.Eventually(t, func() bool { + watcher.wlock.RLock() + defer watcher.wlock.RUnlock() + return !watcher.watchers[vol.ID+vol.Namespace].isRunning() + }, 1*time.Second, 10*time.Millisecond) }