diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index e5f9a8d83..7441ce466 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2278,10 +2278,10 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { err := store.UpsertNode(structs.MsgTypeTestSetup, index, node) require.NoError(t, err) - // Note that for volume writes in this test we need to use the - // RPCs rather than StateStore methods directly so that the GC - // job's RPC call updates a later index. otherwise the - // volumewatcher won't trigger for the final GC + // *Important*: for volume writes in this test we must use RPCs + // rather than StateStore methods directly, or the blocking query + // in volumewatcher won't get the final update for GC because it's + // watching on a different store at that point // Register a volume vols := []*structs.CSIVolume{{ @@ -2302,11 +2302,11 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { volReq, &structs.CSIVolumeRegisterResponse{}) require.NoError(t, err) - // Create a job with two allocations that claim the volume. + // Create a job with two allocs that claim the volume. // We use two allocs here, one of which is not running, so - // that we can assert that the volumewatcher has made one - // complete pass (and removed the 2nd alloc) before running - // the GC. + // that we can assert the volumewatcher has made one + // complete pass (and removed the 2nd alloc) before we + // run the GC eval := mock.Eval() eval.Status = structs.EvalStatusFailed index++ @@ -2331,6 +2331,7 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { alloc2.NodeID = node.ID alloc2.ClientStatus = structs.AllocClientStatusComplete + alloc2.DesiredStatus = structs.AllocDesiredStatusStop alloc2.Job = job alloc2.JobID = job.ID alloc2.EvalID = eval.ID @@ -2344,42 +2345,66 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { index++ require.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1, alloc2})) - // Claim the volume for the alloc req := &structs.CSIVolumeClaimRequest{ - AllocationID: alloc1.ID, - NodeID: node.ID, - VolumeID: volID, - Claim: structs.CSIVolumeClaimWrite, + VolumeID: volID, + AllocationID: alloc1.ID, + NodeID: uuid.Generate(), // doesn't exist so we don't get errors trying to unmount volumes from it + Claim: structs.CSIVolumeClaimWrite, + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + State: structs.CSIVolumeClaimStateTaken, + WriteRequest: structs.WriteRequest{ + Namespace: ns, + Region: srv.config.Region, + }, } - req.Namespace = ns - req.Region = srv.config.Region err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{}) - require.NoError(t, err) + require.NoError(t, err, "write claim should succeed") - // Delete allocation and job + req.AllocationID = alloc2.ID + req.State = structs.CSIVolumeClaimStateUnpublishing + + err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim", + req, &structs.CSIVolumeClaimResponse{}) + require.NoError(t, err, "unpublishing claim should succeed") + + require.Eventually(t, func() bool { + vol, err := store.CSIVolumeByID(ws, ns, volID) + require.NoError(t, err) + return len(vol.WriteClaims) == 1 && + len(vol.WriteAllocs) == 1 && + len(vol.PastClaims) == 0 + }, time.Second*1, 100*time.Millisecond, + "volumewatcher should have released unpublishing claim without GC") + + // At this point we can guarantee that volumewatcher is waiting + // for new work. Delete allocation and job so that the next pass + // thru volumewatcher has more work to do + index, _ = store.LatestIndex() index++ err = store.DeleteJob(index, ns, job.ID) require.NoError(t, err) + index, _ = store.LatestIndex() index++ - err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID, alloc2.ID}) + err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID}) require.NoError(t, err) // Create a core scheduler and attempt the volume claim GC snap, err := store.Snapshot() require.NoError(t, err) + core := NewCoreScheduler(srv, snap) + index, _ = snap.LatestIndex() index++ gc := srv.coreJobEval(structs.CoreJobForceGC, index) c := core.(*CoreScheduler) require.NoError(t, c.csiVolumeClaimGC(gc)) - // sending the GC claim will trigger the volumewatcher's normal - // code path. the volumewatcher will hit an error here because - // there's no path to the node, but this is a node-only plugin so - // we accept that the node has been GC'd and there's no point - // holding onto the claim + // the only remaining claim is for a deleted alloc with no path to + // the non-existent node, so volumewatcher will release the + // remaining claim require.Eventually(t, func() bool { vol, _ := store.CSIVolumeByID(ws, ns, volID) return len(vol.WriteClaims) == 0 && diff --git a/nomad/csi_endpoint.go b/nomad/csi_endpoint.go index e0f364432..e10dfbef9 100644 --- a/nomad/csi_endpoint.go +++ b/nomad/csi_endpoint.go @@ -671,6 +671,7 @@ NODE_DETACHED: } RELEASE_CLAIM: + v.logger.Trace("releasing claim", "vol", vol.ID) // advance a CSIVolumeClaimStateControllerDetached claim claim.State = structs.CSIVolumeClaimStateReadyToFree err = v.checkpointClaim(vol, claim) @@ -684,6 +685,7 @@ RELEASE_CLAIM: } func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { + v.logger.Trace("node unpublish", "vol", vol.ID) if claim.AllocationID != "" { err := v.nodeUnpublishVolumeImpl(vol, claim) if err != nil { @@ -757,7 +759,7 @@ func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *struc } func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { - + v.logger.Trace("controller unpublish", "vol", vol.ID) if !vol.ControllerRequired { claim.State = structs.CSIVolumeClaimStateReadyToFree return nil diff --git a/nomad/volumewatcher/volume_watcher.go b/nomad/volumewatcher/volume_watcher.go index fe69bca41..82cd76fb8 100644 --- a/nomad/volumewatcher/volume_watcher.go +++ b/nomad/volumewatcher/volume_watcher.go @@ -3,10 +3,12 @@ package volumewatcher import ( "context" "sync" + "time" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" multierror "github.com/hashicorp/go-multierror" + "github.com/hashicorp/nomad/helper" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" ) @@ -30,6 +32,11 @@ type volumeWatcher struct { shutdownCtx context.Context // parent context ctx context.Context // own context exitFn context.CancelFunc + deleteFn func() + + // quiescentTimeout is the time we wait until the volume has "settled" + // before stopping the child watcher goroutines + quiescentTimeout time.Duration // updateCh is triggered when there is an updated volume updateCh chan *structs.CSIVolume @@ -43,13 +50,15 @@ type volumeWatcher struct { func newVolumeWatcher(parent *Watcher, vol *structs.CSIVolume) *volumeWatcher { w := &volumeWatcher{ - updateCh: make(chan *structs.CSIVolume, 1), - v: vol, - state: parent.state, - rpc: parent.rpc, - leaderAcl: parent.leaderAcl, - logger: parent.logger.With("volume_id", vol.ID, "namespace", vol.Namespace), - shutdownCtx: parent.ctx, + updateCh: make(chan *structs.CSIVolume, 1), + v: vol, + state: parent.state, + rpc: parent.rpc, + leaderAcl: parent.leaderAcl, + logger: parent.logger.With("volume_id", vol.ID, "namespace", vol.Namespace), + shutdownCtx: parent.ctx, + deleteFn: func() { parent.remove(vol.ID + vol.Namespace) }, + quiescentTimeout: parent.quiescentTimeout, } // Start the long lived watcher that scans for allocation updates @@ -111,6 +120,9 @@ func (vw *volumeWatcher) watch() { vol := vw.getVolume(vw.v) vw.volumeReap(vol) + timer, stop := helper.NewSafeTimer(vw.quiescentTimeout) + defer stop() + for { select { // TODO(tgross): currently server->client RPC have no cancellation @@ -120,17 +132,28 @@ func (vw *volumeWatcher) watch() { case <-vw.ctx.Done(): return case vol := <-vw.updateCh: - // while we won't make raft writes if we get a stale update, - // we can still fire extra CSI RPC calls if we don't check this - if vol.ModifyIndex >= vw.v.ModifyIndex { - vol = vw.getVolume(vol) - if vol == nil { - return - } - vw.volumeReap(vol) + vol = vw.getVolume(vol) + if vol == nil { + // We stop the goroutine whenever we have no more + // work, but only delete the watcher when the volume + // is gone to avoid racing the blocking query + vw.deleteFn() + vw.Stop() + return } - default: - vw.Stop() // no pending work + vw.volumeReap(vol) + timer.Reset(vw.quiescentTimeout) + case <-timer.C: + // Wait until the volume has "settled" before stopping + // this goroutine so that the race between shutdown and + // the parent goroutine sending on <-updateCh is pushed to + // after the window we most care about quick freeing of + // claims (and the GC job will clean up anything we miss) + vol = vw.getVolume(vol) + if vol == nil { + vw.deleteFn() + } + vw.Stop() return } } @@ -145,9 +168,12 @@ func (vw *volumeWatcher) getVolume(vol *structs.CSIVolume) *structs.CSIVolume { var err error ws := memdb.NewWatchSet() - vol, err = vw.state.CSIVolumeDenormalizePlugins(ws, vol.Copy()) + vol, err = vw.state.CSIVolumeByID(ws, vol.Namespace, vol.ID) if err != nil { - vw.logger.Error("could not query plugins for volume", "error", err) + vw.logger.Error("could not query for volume", "error", err) + return nil + } + if vol == nil { return nil } @@ -168,9 +194,6 @@ func (vw *volumeWatcher) volumeReap(vol *structs.CSIVolume) { if err != nil { vw.logger.Error("error releasing volume claims", "error", err) } - if vw.isUnclaimed(vol) { - vw.Stop() - } } func (vw *volumeWatcher) isUnclaimed(vol *structs.CSIVolume) bool { @@ -230,6 +253,7 @@ func (vw *volumeWatcher) collectPastClaims(vol *structs.CSIVolume) *structs.CSIV } func (vw *volumeWatcher) unpublish(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error { + vw.logger.Trace("unpublishing volume", "alloc", claim.AllocationID) req := &structs.CSIVolumeUnpublishRequest{ VolumeID: vol.ID, Claim: claim, diff --git a/nomad/volumewatcher/volumes_watcher.go b/nomad/volumewatcher/volumes_watcher.go index c8c687add..5ae08e1ff 100644 --- a/nomad/volumewatcher/volumes_watcher.go +++ b/nomad/volumewatcher/volumes_watcher.go @@ -3,6 +3,7 @@ package volumewatcher import ( "context" "sync" + "time" log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" @@ -34,9 +35,15 @@ type Watcher struct { ctx context.Context exitFn context.CancelFunc + // quiescentTimeout is the time we wait until the volume has "settled" + // before stopping the child watcher goroutines + quiescentTimeout time.Duration + wlock sync.RWMutex } +var defaultQuiescentTimeout = time.Minute * 5 + // NewVolumesWatcher returns a volumes watcher that is used to watch // volumes and trigger the scheduler as needed. func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *Watcher { @@ -47,11 +54,12 @@ func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *W ctx, exitFn := context.WithCancel(context.Background()) return &Watcher{ - rpc: rpc, - logger: logger.Named("volumes_watcher"), - ctx: ctx, - exitFn: exitFn, - leaderAcl: leaderAcl, + rpc: rpc, + logger: logger.Named("volumes_watcher"), + ctx: ctx, + exitFn: exitFn, + leaderAcl: leaderAcl, + quiescentTimeout: defaultQuiescentTimeout, } } @@ -157,10 +165,10 @@ func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, state *state.StateStore) (in } // add adds a volume to the watch list -func (w *Watcher) add(d *structs.CSIVolume) error { +func (w *Watcher) add(v *structs.CSIVolume) error { w.wlock.Lock() defer w.wlock.Unlock() - _, err := w.addLocked(d) + _, err := w.addLocked(v) return err } @@ -182,3 +190,10 @@ func (w *Watcher) addLocked(v *structs.CSIVolume) (*volumeWatcher, error) { w.watchers[v.ID+v.Namespace] = watcher return watcher, nil } + +// removes a volume from the watch list +func (w *Watcher) remove(volID string) { + w.wlock.Lock() + defer w.wlock.Unlock() + delete(w.watchers, volID) +} diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 41da11f82..97a83a9b5 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -23,18 +23,22 @@ func TestVolumeWatch_EnableDisable(t *testing.T) { index := uint64(100) watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") + watcher.quiescentTimeout = 100 * time.Millisecond watcher.SetEnabled(true, srv.State(), "") plugin := mock.CSIPlugin() node := testNode(plugin, srv.State()) alloc := mock.Alloc() alloc.ClientStatus = structs.AllocClientStatusComplete + vol := testVolume(plugin, alloc, node.ID) index++ err := srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol}) require.NoError(t, err) + // need to have just enough of a volume and claim in place so that + // the watcher doesn't immediately stop and unload itself claim := &structs.CSIVolumeClaim{ Mode: structs.CSIVolumeClaimGC, State: structs.CSIVolumeClaimStateNodeDetached, @@ -62,6 +66,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { index := uint64(100) watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") + watcher.quiescentTimeout = 100 * time.Millisecond plugin := mock.CSIPlugin() node := testNode(plugin, srv.State()) @@ -120,12 +125,12 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { // create a new watcher and enable it to simulate the leadership // transition watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "") + watcher.quiescentTimeout = 100 * time.Millisecond watcher.SetEnabled(true, srv.State(), "") require.Eventually(t, func() bool { watcher.wlock.RLock() defer watcher.wlock.RUnlock() - return 1 == len(watcher.watchers) && !watcher.watchers[vol.ID+vol.Namespace].isRunning() }, time.Second, 10*time.Millisecond) @@ -144,6 +149,7 @@ func TestVolumeWatch_StartStop(t *testing.T) { srv.state = state.TestStateStore(t) index := uint64(100) watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") + watcher.quiescentTimeout = 100 * time.Millisecond watcher.SetEnabled(true, srv.State(), "") require.Equal(t, 0, len(watcher.watchers)) @@ -240,6 +246,7 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) { index := uint64(100) watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "") + watcher.quiescentTimeout = 10 * time.Millisecond watcher.SetEnabled(true, srv.State(), "") require.Equal(t, 0, len(watcher.watchers)) @@ -261,5 +268,9 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) { return 1 == len(watcher.watchers) }, time.Second, 10*time.Millisecond) - require.False(t, watcher.watchers[vol.ID+vol.Namespace].isRunning()) + require.Eventually(t, func() bool { + watcher.wlock.RLock() + defer watcher.wlock.RUnlock() + return !watcher.watchers[vol.ID+vol.Namespace].isRunning() + }, 1*time.Second, 10*time.Millisecond) }