diff --git a/nomad/volumewatcher/volume_watcher.go b/nomad/volumewatcher/volume_watcher.go index efa54e78c..6579564d6 100644 --- a/nomad/volumewatcher/volume_watcher.go +++ b/nomad/volumewatcher/volume_watcher.go @@ -124,6 +124,9 @@ func (vw *volumeWatcher) watch() { } vw.volumeReap(vol) } + default: + vw.Stop() // no pending work + return } } } diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 4236068a8..3d00fa13b 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -92,10 +92,9 @@ func TestVolumeWatch_Checkpoint(t *testing.T) { // step-up again watcher.SetEnabled(true, srv.State()) require.Eventually(func() bool { - return 1 == len(watcher.watchers) + return 1 == len(watcher.watchers) && + !watcher.watchers[vol.ID+vol.Namespace].isRunning() }, time.Second, 10*time.Millisecond) - - require.True(watcher.watchers[vol.ID+vol.Namespace].isRunning()) } // TestVolumeWatch_StartStop tests the start and stop of the watcher when @@ -123,33 +122,33 @@ func TestVolumeWatch_StartStop(t *testing.T) { plugin := mock.CSIPlugin() node := testNode(nil, plugin, srv.State()) - alloc := mock.Alloc() - alloc.ClientStatus = structs.AllocClientStatusRunning + alloc1 := mock.Alloc() + alloc1.ClientStatus = structs.AllocClientStatusRunning alloc2 := mock.Alloc() - alloc2.Job = alloc.Job + alloc2.Job = alloc1.Job alloc2.ClientStatus = structs.AllocClientStatusRunning index++ - err := srv.State().UpsertJob(index, alloc.Job) + err := srv.State().UpsertJob(index, alloc1.Job) require.NoError(err) index++ - err = srv.State().UpsertAllocs(index, []*structs.Allocation{alloc, alloc2}) + err = srv.State().UpsertAllocs(index, []*structs.Allocation{alloc1, alloc2}) require.NoError(err) // register a volume - vol := testVolume(nil, plugin, alloc, node.ID) + vol := testVolume(nil, plugin, alloc1, node.ID) index++ err = srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) require.NoError(err) - // assert we get a running watcher + // assert we get a watcher; there are no claims so it should immediately stop require.Eventually(func() bool { - return 1 == len(watcher.watchers) - }, time.Second, 10*time.Millisecond) - require.True(watcher.watchers[vol.ID+vol.Namespace].isRunning()) + return 1 == len(watcher.watchers) && + !watcher.watchers[vol.ID+vol.Namespace].isRunning() + }, time.Second*2, 10*time.Millisecond) // claim the volume for both allocs claim := &structs.CSIVolumeClaim{ - AllocationID: alloc.ID, + AllocationID: alloc1.ID, NodeID: node.ID, Mode: structs.CSIVolumeClaimRead, } @@ -163,33 +162,38 @@ func TestVolumeWatch_StartStop(t *testing.T) { // reap the volume and assert nothing has happened claim = &structs.CSIVolumeClaim{ - AllocationID: alloc.ID, + AllocationID: alloc1.ID, NodeID: node.ID, Mode: structs.CSIVolumeClaimRelease, } index++ err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim) require.NoError(err) - require.True(watcher.watchers[vol.ID+vol.Namespace].isRunning()) + + ws := memdb.NewWatchSet() + vol, _ = srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID) + require.Equal(2, len(vol.ReadAllocs)) // alloc becomes terminal - alloc.ClientStatus = structs.AllocClientStatusComplete + alloc1.ClientStatus = structs.AllocClientStatusComplete index++ - err = srv.State().UpsertAllocs(index, []*structs.Allocation{alloc}) + err = srv.State().UpsertAllocs(index, []*structs.Allocation{alloc1}) require.NoError(err) index++ claim.State = structs.CSIVolumeClaimStateReadyToFree err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim) require.NoError(err) - // 1 claim has been released but watcher is still running + // 1 claim has been released and watcher stops require.Eventually(func() bool { ws := memdb.NewWatchSet() vol, _ := srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID) return len(vol.ReadAllocs) == 1 && len(vol.PastClaims) == 0 }, time.Second*2, 10*time.Millisecond) - require.True(watcher.watchers[vol.ID+vol.Namespace].isRunning()) + require.Eventually(func() bool { + return !watcher.watchers[vol.ID+vol.Namespace].isRunning() + }, time.Second*5, 10*time.Millisecond) // the watcher will have incremented the index so we need to make sure // our inserts will trigger new events @@ -209,16 +213,16 @@ func TestVolumeWatch_StartStop(t *testing.T) { err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim) require.NoError(err) - // all claims have been released and watcher is stopped + // all claims have been released and watcher has stopped again require.Eventually(func() bool { ws := memdb.NewWatchSet() vol, _ := srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID) - return len(vol.ReadAllocs) == 1 && len(vol.PastClaims) == 0 + return len(vol.ReadAllocs) == 0 && len(vol.PastClaims) == 0 }, time.Second*2, 10*time.Millisecond) require.Eventually(func() bool { return !watcher.watchers[vol.ID+vol.Namespace].isRunning() - }, time.Second*1, 10*time.Millisecond) + }, time.Second*5, 10*time.Millisecond) // the watcher will have incremented the index so we need to make sure // our inserts will trigger new events @@ -242,7 +246,7 @@ func TestVolumeWatch_StartStop(t *testing.T) { // a stopped watcher should restore itself on notification require.Eventually(func() bool { return watcher.watchers[vol.ID+vol.Namespace].isRunning() - }, time.Second*1, 10*time.Millisecond) + }, time.Second*5, 10*time.Millisecond) } // TestVolumeWatch_RegisterDeregister tests the start and stop of