CSI: volume watcher shutdown fixes (#12439)

The volume watcher design was based on deploymentwatcher and drainer,
but has an important difference: we don't want to maintain a goroutine
for the lifetime of the volume. So we stop the volumewatcher goroutine
for a volume when that volume has no more claims to free. But the
shutdown races with updates on the parent goroutine, and it's possible
to drop updates. Fortunately these updates are picked up on the next
core GC job, but we're most likely to hit this race when we're
replacing an allocation and that's the time we least want to wait.

Wait until the volume has "settled" before stopping this goroutine so
that the race between shutdown and the parent goroutine sending on
`<-updateCh` is pushed to after the window we most care about quick
freeing of claims.

* Fixes a resource leak when volumewatchers are no longer needed. The
  volume is nil and can't ever be started again, so the volume's
  `watcher` should be removed from the top-level `Watcher`.

* De-flakes the GC job test: the test throws an error because the
  claimed node doesn't exist and is unreachable. This flaked instead of
  failed because we didn't correctly wait for the first pass through the
  volumewatcher.

  Make the GC job wait for the volumewatcher to reach the quiescent
  timeout window state before running the GC eval under test, so that
  we're sure the GC job's work isn't being picked up by processing one
  of the earlier claims. Update the claims used so that we're sure the
  GC pass won't hit a node unpublish error.

* Adds trace logging to unpublish operations
This commit is contained in:
Tim Gross
2022-04-04 10:46:45 -04:00
committed by GitHub
parent f8d693b079
commit f718c132b4
5 changed files with 132 additions and 55 deletions

View File

@@ -2278,10 +2278,10 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
err := store.UpsertNode(structs.MsgTypeTestSetup, index, node)
require.NoError(t, err)
// Note that for volume writes in this test we need to use the
// RPCs rather than StateStore methods directly so that the GC
// job's RPC call updates a later index. otherwise the
// volumewatcher won't trigger for the final GC
// *Important*: for volume writes in this test we must use RPCs
// rather than StateStore methods directly, or the blocking query
// in volumewatcher won't get the final update for GC because it's
// watching on a different store at that point
// Register a volume
vols := []*structs.CSIVolume{{
@@ -2302,11 +2302,11 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
volReq, &structs.CSIVolumeRegisterResponse{})
require.NoError(t, err)
// Create a job with two allocations that claim the volume.
// Create a job with two allocs that claim the volume.
// We use two allocs here, one of which is not running, so
// that we can assert that the volumewatcher has made one
// complete pass (and removed the 2nd alloc) before running
// the GC.
// that we can assert the volumewatcher has made one
// complete pass (and removed the 2nd alloc) before we
// run the GC
eval := mock.Eval()
eval.Status = structs.EvalStatusFailed
index++
@@ -2331,6 +2331,7 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
alloc2.NodeID = node.ID
alloc2.ClientStatus = structs.AllocClientStatusComplete
alloc2.DesiredStatus = structs.AllocDesiredStatusStop
alloc2.Job = job
alloc2.JobID = job.ID
alloc2.EvalID = eval.ID
@@ -2344,42 +2345,66 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
index++
require.NoError(t, store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1, alloc2}))
// Claim the volume for the alloc
req := &structs.CSIVolumeClaimRequest{
AllocationID: alloc1.ID,
NodeID: node.ID,
VolumeID: volID,
Claim: structs.CSIVolumeClaimWrite,
VolumeID: volID,
AllocationID: alloc1.ID,
NodeID: uuid.Generate(), // doesn't exist so we don't get errors trying to unmount volumes from it
Claim: structs.CSIVolumeClaimWrite,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
State: structs.CSIVolumeClaimStateTaken,
WriteRequest: structs.WriteRequest{
Namespace: ns,
Region: srv.config.Region,
},
}
req.Namespace = ns
req.Region = srv.config.Region
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim",
req, &structs.CSIVolumeClaimResponse{})
require.NoError(t, err)
require.NoError(t, err, "write claim should succeed")
// Delete allocation and job
req.AllocationID = alloc2.ID
req.State = structs.CSIVolumeClaimStateUnpublishing
err = msgpackrpc.CallWithCodec(codec, "CSIVolume.Claim",
req, &structs.CSIVolumeClaimResponse{})
require.NoError(t, err, "unpublishing claim should succeed")
require.Eventually(t, func() bool {
vol, err := store.CSIVolumeByID(ws, ns, volID)
require.NoError(t, err)
return len(vol.WriteClaims) == 1 &&
len(vol.WriteAllocs) == 1 &&
len(vol.PastClaims) == 0
}, time.Second*1, 100*time.Millisecond,
"volumewatcher should have released unpublishing claim without GC")
// At this point we can guarantee that volumewatcher is waiting
// for new work. Delete allocation and job so that the next pass
// thru volumewatcher has more work to do
index, _ = store.LatestIndex()
index++
err = store.DeleteJob(index, ns, job.ID)
require.NoError(t, err)
index, _ = store.LatestIndex()
index++
err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID, alloc2.ID})
err = store.DeleteEval(index, []string{eval.ID}, []string{alloc1.ID})
require.NoError(t, err)
// Create a core scheduler and attempt the volume claim GC
snap, err := store.Snapshot()
require.NoError(t, err)
core := NewCoreScheduler(srv, snap)
index, _ = snap.LatestIndex()
index++
gc := srv.coreJobEval(structs.CoreJobForceGC, index)
c := core.(*CoreScheduler)
require.NoError(t, c.csiVolumeClaimGC(gc))
// sending the GC claim will trigger the volumewatcher's normal
// code path. the volumewatcher will hit an error here because
// there's no path to the node, but this is a node-only plugin so
// we accept that the node has been GC'd and there's no point
// holding onto the claim
// the only remaining claim is for a deleted alloc with no path to
// the non-existent node, so volumewatcher will release the
// remaining claim
require.Eventually(t, func() bool {
vol, _ := store.CSIVolumeByID(ws, ns, volID)
return len(vol.WriteClaims) == 0 &&

View File

@@ -671,6 +671,7 @@ NODE_DETACHED:
}
RELEASE_CLAIM:
v.logger.Trace("releasing claim", "vol", vol.ID)
// advance a CSIVolumeClaimStateControllerDetached claim
claim.State = structs.CSIVolumeClaimStateReadyToFree
err = v.checkpointClaim(vol, claim)
@@ -684,6 +685,7 @@ RELEASE_CLAIM:
}
func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
v.logger.Trace("node unpublish", "vol", vol.ID)
if claim.AllocationID != "" {
err := v.nodeUnpublishVolumeImpl(vol, claim)
if err != nil {
@@ -757,7 +759,7 @@ func (v *CSIVolume) nodeUnpublishVolumeImpl(vol *structs.CSIVolume, claim *struc
}
func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
v.logger.Trace("controller unpublish", "vol", vol.ID)
if !vol.ControllerRequired {
claim.State = structs.CSIVolumeClaimStateReadyToFree
return nil

View File

@@ -3,10 +3,12 @@ package volumewatcher
import (
"context"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
)
@@ -30,6 +32,11 @@ type volumeWatcher struct {
shutdownCtx context.Context // parent context
ctx context.Context // own context
exitFn context.CancelFunc
deleteFn func()
// quiescentTimeout is the time we wait until the volume has "settled"
// before stopping the child watcher goroutines
quiescentTimeout time.Duration
// updateCh is triggered when there is an updated volume
updateCh chan *structs.CSIVolume
@@ -43,13 +50,15 @@ type volumeWatcher struct {
func newVolumeWatcher(parent *Watcher, vol *structs.CSIVolume) *volumeWatcher {
w := &volumeWatcher{
updateCh: make(chan *structs.CSIVolume, 1),
v: vol,
state: parent.state,
rpc: parent.rpc,
leaderAcl: parent.leaderAcl,
logger: parent.logger.With("volume_id", vol.ID, "namespace", vol.Namespace),
shutdownCtx: parent.ctx,
updateCh: make(chan *structs.CSIVolume, 1),
v: vol,
state: parent.state,
rpc: parent.rpc,
leaderAcl: parent.leaderAcl,
logger: parent.logger.With("volume_id", vol.ID, "namespace", vol.Namespace),
shutdownCtx: parent.ctx,
deleteFn: func() { parent.remove(vol.ID + vol.Namespace) },
quiescentTimeout: parent.quiescentTimeout,
}
// Start the long lived watcher that scans for allocation updates
@@ -111,6 +120,9 @@ func (vw *volumeWatcher) watch() {
vol := vw.getVolume(vw.v)
vw.volumeReap(vol)
timer, stop := helper.NewSafeTimer(vw.quiescentTimeout)
defer stop()
for {
select {
// TODO(tgross): currently server->client RPC have no cancellation
@@ -120,17 +132,28 @@ func (vw *volumeWatcher) watch() {
case <-vw.ctx.Done():
return
case vol := <-vw.updateCh:
// while we won't make raft writes if we get a stale update,
// we can still fire extra CSI RPC calls if we don't check this
if vol.ModifyIndex >= vw.v.ModifyIndex {
vol = vw.getVolume(vol)
if vol == nil {
return
}
vw.volumeReap(vol)
vol = vw.getVolume(vol)
if vol == nil {
// We stop the goroutine whenever we have no more
// work, but only delete the watcher when the volume
// is gone to avoid racing the blocking query
vw.deleteFn()
vw.Stop()
return
}
default:
vw.Stop() // no pending work
vw.volumeReap(vol)
timer.Reset(vw.quiescentTimeout)
case <-timer.C:
// Wait until the volume has "settled" before stopping
// this goroutine so that the race between shutdown and
// the parent goroutine sending on <-updateCh is pushed to
// after the window we most care about quick freeing of
// claims (and the GC job will clean up anything we miss)
vol = vw.getVolume(vol)
if vol == nil {
vw.deleteFn()
}
vw.Stop()
return
}
}
@@ -145,9 +168,12 @@ func (vw *volumeWatcher) getVolume(vol *structs.CSIVolume) *structs.CSIVolume {
var err error
ws := memdb.NewWatchSet()
vol, err = vw.state.CSIVolumeDenormalizePlugins(ws, vol.Copy())
vol, err = vw.state.CSIVolumeByID(ws, vol.Namespace, vol.ID)
if err != nil {
vw.logger.Error("could not query plugins for volume", "error", err)
vw.logger.Error("could not query for volume", "error", err)
return nil
}
if vol == nil {
return nil
}
@@ -168,9 +194,6 @@ func (vw *volumeWatcher) volumeReap(vol *structs.CSIVolume) {
if err != nil {
vw.logger.Error("error releasing volume claims", "error", err)
}
if vw.isUnclaimed(vol) {
vw.Stop()
}
}
func (vw *volumeWatcher) isUnclaimed(vol *structs.CSIVolume) bool {
@@ -230,6 +253,7 @@ func (vw *volumeWatcher) collectPastClaims(vol *structs.CSIVolume) *structs.CSIV
}
func (vw *volumeWatcher) unpublish(vol *structs.CSIVolume, claim *structs.CSIVolumeClaim) error {
vw.logger.Trace("unpublishing volume", "alloc", claim.AllocationID)
req := &structs.CSIVolumeUnpublishRequest{
VolumeID: vol.ID,
Claim: claim,

View File

@@ -3,6 +3,7 @@ package volumewatcher
import (
"context"
"sync"
"time"
log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
@@ -34,9 +35,15 @@ type Watcher struct {
ctx context.Context
exitFn context.CancelFunc
// quiescentTimeout is the time we wait until the volume has "settled"
// before stopping the child watcher goroutines
quiescentTimeout time.Duration
wlock sync.RWMutex
}
var defaultQuiescentTimeout = time.Minute * 5
// NewVolumesWatcher returns a volumes watcher that is used to watch
// volumes and trigger the scheduler as needed.
func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *Watcher {
@@ -47,11 +54,12 @@ func NewVolumesWatcher(logger log.Logger, rpc CSIVolumeRPC, leaderAcl string) *W
ctx, exitFn := context.WithCancel(context.Background())
return &Watcher{
rpc: rpc,
logger: logger.Named("volumes_watcher"),
ctx: ctx,
exitFn: exitFn,
leaderAcl: leaderAcl,
rpc: rpc,
logger: logger.Named("volumes_watcher"),
ctx: ctx,
exitFn: exitFn,
leaderAcl: leaderAcl,
quiescentTimeout: defaultQuiescentTimeout,
}
}
@@ -157,10 +165,10 @@ func (w *Watcher) getVolumesImpl(ws memdb.WatchSet, state *state.StateStore) (in
}
// add adds a volume to the watch list
func (w *Watcher) add(d *structs.CSIVolume) error {
func (w *Watcher) add(v *structs.CSIVolume) error {
w.wlock.Lock()
defer w.wlock.Unlock()
_, err := w.addLocked(d)
_, err := w.addLocked(v)
return err
}
@@ -182,3 +190,10 @@ func (w *Watcher) addLocked(v *structs.CSIVolume) (*volumeWatcher, error) {
w.watchers[v.ID+v.Namespace] = watcher
return watcher, nil
}
// removes a volume from the watch list
func (w *Watcher) remove(volID string) {
w.wlock.Lock()
defer w.wlock.Unlock()
delete(w.watchers, volID)
}

View File

@@ -23,18 +23,22 @@ func TestVolumeWatch_EnableDisable(t *testing.T) {
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 100 * time.Millisecond
watcher.SetEnabled(true, srv.State(), "")
plugin := mock.CSIPlugin()
node := testNode(plugin, srv.State())
alloc := mock.Alloc()
alloc.ClientStatus = structs.AllocClientStatusComplete
vol := testVolume(plugin, alloc, node.ID)
index++
err := srv.State().UpsertCSIVolume(index, []*structs.CSIVolume{vol})
require.NoError(t, err)
// need to have just enough of a volume and claim in place so that
// the watcher doesn't immediately stop and unload itself
claim := &structs.CSIVolumeClaim{
Mode: structs.CSIVolumeClaimGC,
State: structs.CSIVolumeClaimStateNodeDetached,
@@ -62,6 +66,7 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 100 * time.Millisecond
plugin := mock.CSIPlugin()
node := testNode(plugin, srv.State())
@@ -120,12 +125,12 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) {
// create a new watcher and enable it to simulate the leadership
// transition
watcher = NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 100 * time.Millisecond
watcher.SetEnabled(true, srv.State(), "")
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return 1 == len(watcher.watchers) &&
!watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, time.Second, 10*time.Millisecond)
@@ -144,6 +149,7 @@ func TestVolumeWatch_StartStop(t *testing.T) {
srv.state = state.TestStateStore(t)
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 100 * time.Millisecond
watcher.SetEnabled(true, srv.State(), "")
require.Equal(t, 0, len(watcher.watchers))
@@ -240,6 +246,7 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) {
index := uint64(100)
watcher := NewVolumesWatcher(testlog.HCLogger(t), srv, "")
watcher.quiescentTimeout = 10 * time.Millisecond
watcher.SetEnabled(true, srv.State(), "")
require.Equal(t, 0, len(watcher.watchers))
@@ -261,5 +268,9 @@ func TestVolumeWatch_RegisterDeregister(t *testing.T) {
return 1 == len(watcher.watchers)
}, time.Second, 10*time.Millisecond)
require.False(t, watcher.watchers[vol.ID+vol.Namespace].isRunning())
require.Eventually(t, func() bool {
watcher.wlock.RLock()
defer watcher.wlock.RUnlock()
return !watcher.watchers[vol.ID+vol.Namespace].isRunning()
}, 1*time.Second, 10*time.Millisecond)
}