mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 09:55:44 +03:00
volumewatcher: stop watcher goroutines when there's no work (#7909)
The watcher goroutines will be automatically started if a volume has updates, but when idle we shouldn't keep a goroutine running and taking up memory.
This commit is contained in:
@@ -124,6 +124,9 @@ func (vw *volumeWatcher) watch() {
|
||||
}
|
||||
vw.volumeReap(vol)
|
||||
}
|
||||
default:
|
||||
vw.Stop() // no pending work
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,10 +92,9 @@ func TestVolumeWatch_Checkpoint(t *testing.T) {
|
||||
// step-up again
|
||||
watcher.SetEnabled(true, srv.State())
|
||||
require.Eventually(func() bool {
|
||||
return 1 == len(watcher.watchers)
|
||||
return 1 == len(watcher.watchers) &&
|
||||
!watcher.watchers[vol.ID+vol.Namespace].isRunning()
|
||||
}, time.Second, 10*time.Millisecond)
|
||||
|
||||
require.True(watcher.watchers[vol.ID+vol.Namespace].isRunning())
|
||||
}
|
||||
|
||||
// TestVolumeWatch_StartStop tests the start and stop of the watcher when
|
||||
@@ -123,33 +122,33 @@ func TestVolumeWatch_StartStop(t *testing.T) {
|
||||
|
||||
plugin := mock.CSIPlugin()
|
||||
node := testNode(nil, plugin, srv.State())
|
||||
alloc := mock.Alloc()
|
||||
alloc.ClientStatus = structs.AllocClientStatusRunning
|
||||
alloc1 := mock.Alloc()
|
||||
alloc1.ClientStatus = structs.AllocClientStatusRunning
|
||||
alloc2 := mock.Alloc()
|
||||
alloc2.Job = alloc.Job
|
||||
alloc2.Job = alloc1.Job
|
||||
alloc2.ClientStatus = structs.AllocClientStatusRunning
|
||||
index++
|
||||
err := srv.State().UpsertJob(index, alloc.Job)
|
||||
err := srv.State().UpsertJob(index, alloc1.Job)
|
||||
require.NoError(err)
|
||||
index++
|
||||
err = srv.State().UpsertAllocs(index, []*structs.Allocation{alloc, alloc2})
|
||||
err = srv.State().UpsertAllocs(index, []*structs.Allocation{alloc1, alloc2})
|
||||
require.NoError(err)
|
||||
|
||||
// register a volume
|
||||
vol := testVolume(nil, plugin, alloc, node.ID)
|
||||
vol := testVolume(nil, plugin, alloc1, node.ID)
|
||||
index++
|
||||
err = srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol})
|
||||
require.NoError(err)
|
||||
|
||||
// assert we get a running watcher
|
||||
// assert we get a watcher; there are no claims so it should immediately stop
|
||||
require.Eventually(func() bool {
|
||||
return 1 == len(watcher.watchers)
|
||||
}, time.Second, 10*time.Millisecond)
|
||||
require.True(watcher.watchers[vol.ID+vol.Namespace].isRunning())
|
||||
return 1 == len(watcher.watchers) &&
|
||||
!watcher.watchers[vol.ID+vol.Namespace].isRunning()
|
||||
}, time.Second*2, 10*time.Millisecond)
|
||||
|
||||
// claim the volume for both allocs
|
||||
claim := &structs.CSIVolumeClaim{
|
||||
AllocationID: alloc.ID,
|
||||
AllocationID: alloc1.ID,
|
||||
NodeID: node.ID,
|
||||
Mode: structs.CSIVolumeClaimRead,
|
||||
}
|
||||
@@ -163,33 +162,38 @@ func TestVolumeWatch_StartStop(t *testing.T) {
|
||||
|
||||
// reap the volume and assert nothing has happened
|
||||
claim = &structs.CSIVolumeClaim{
|
||||
AllocationID: alloc.ID,
|
||||
AllocationID: alloc1.ID,
|
||||
NodeID: node.ID,
|
||||
Mode: structs.CSIVolumeClaimRelease,
|
||||
}
|
||||
index++
|
||||
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
|
||||
require.NoError(err)
|
||||
require.True(watcher.watchers[vol.ID+vol.Namespace].isRunning())
|
||||
|
||||
ws := memdb.NewWatchSet()
|
||||
vol, _ = srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
|
||||
require.Equal(2, len(vol.ReadAllocs))
|
||||
|
||||
// alloc becomes terminal
|
||||
alloc.ClientStatus = structs.AllocClientStatusComplete
|
||||
alloc1.ClientStatus = structs.AllocClientStatusComplete
|
||||
index++
|
||||
err = srv.State().UpsertAllocs(index, []*structs.Allocation{alloc})
|
||||
err = srv.State().UpsertAllocs(index, []*structs.Allocation{alloc1})
|
||||
require.NoError(err)
|
||||
index++
|
||||
claim.State = structs.CSIVolumeClaimStateReadyToFree
|
||||
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
|
||||
require.NoError(err)
|
||||
|
||||
// 1 claim has been released but watcher is still running
|
||||
// 1 claim has been released and watcher stops
|
||||
require.Eventually(func() bool {
|
||||
ws := memdb.NewWatchSet()
|
||||
vol, _ := srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
|
||||
return len(vol.ReadAllocs) == 1 && len(vol.PastClaims) == 0
|
||||
}, time.Second*2, 10*time.Millisecond)
|
||||
|
||||
require.True(watcher.watchers[vol.ID+vol.Namespace].isRunning())
|
||||
require.Eventually(func() bool {
|
||||
return !watcher.watchers[vol.ID+vol.Namespace].isRunning()
|
||||
}, time.Second*5, 10*time.Millisecond)
|
||||
|
||||
// the watcher will have incremented the index so we need to make sure
|
||||
// our inserts will trigger new events
|
||||
@@ -209,16 +213,16 @@ func TestVolumeWatch_StartStop(t *testing.T) {
|
||||
err = srv.State().CSIVolumeClaim(index, vol.Namespace, vol.ID, claim)
|
||||
require.NoError(err)
|
||||
|
||||
// all claims have been released and watcher is stopped
|
||||
// all claims have been released and watcher has stopped again
|
||||
require.Eventually(func() bool {
|
||||
ws := memdb.NewWatchSet()
|
||||
vol, _ := srv.State().CSIVolumeByID(ws, vol.Namespace, vol.ID)
|
||||
return len(vol.ReadAllocs) == 1 && len(vol.PastClaims) == 0
|
||||
return len(vol.ReadAllocs) == 0 && len(vol.PastClaims) == 0
|
||||
}, time.Second*2, 10*time.Millisecond)
|
||||
|
||||
require.Eventually(func() bool {
|
||||
return !watcher.watchers[vol.ID+vol.Namespace].isRunning()
|
||||
}, time.Second*1, 10*time.Millisecond)
|
||||
}, time.Second*5, 10*time.Millisecond)
|
||||
|
||||
// the watcher will have incremented the index so we need to make sure
|
||||
// our inserts will trigger new events
|
||||
@@ -242,7 +246,7 @@ func TestVolumeWatch_StartStop(t *testing.T) {
|
||||
// a stopped watcher should restore itself on notification
|
||||
require.Eventually(func() bool {
|
||||
return watcher.watchers[vol.ID+vol.Namespace].isRunning()
|
||||
}, time.Second*1, 10*time.Millisecond)
|
||||
}, time.Second*5, 10*time.Millisecond)
|
||||
}
|
||||
|
||||
// TestVolumeWatch_RegisterDeregister tests the start and stop of
|
||||
|
||||
Reference in New Issue
Block a user