deployment watcher: fix goroutine leak when job is purged (#20348)

The deployment watcher on the leader makes blocking queries to detect when the
set of active deployments changes. It takes the resulting list of deployments
and adds or removes watchers based on whether the deployment is active. But when
a job is purged, the deployment will be deleted. This unblocks the query but
the query result only shows the remaining deployments.

When the query unblocks, ensure that all active watchers have a corresponding
deployment in state. If not, remove the watcher so that the goroutine stops.

Fixes: https://github.com/hashicorp/nomad/issues/19988
This commit is contained in:
Tim Gross
2024-04-11 14:51:05 -04:00
committed by GitHub
parent ca22f34373
commit a13e455c51
3 changed files with 85 additions and 4 deletions

3
.changelog/20348.txt Normal file
View File

@@ -0,0 +1,3 @@
```release-note:bug
deployments: Fixed a goroutine leak when jobs are purged
```

View File

@@ -180,8 +180,7 @@ func (w *Watcher) watchDeployments(ctx context.Context) {
// Update the latest index
dindex = idx
// Ensure we are tracking the things we should and not tracking what we
// shouldn't be
// Ensure we are tracking only active deployments
for _, d := range deployments {
if d.Active() {
if err := w.add(d); err != nil {
@@ -191,6 +190,9 @@ func (w *Watcher) watchDeployments(ctx context.Context) {
w.remove(d)
}
}
// Ensure we've removed deployments for purged jobs
w.removeDeletedDeployments(deployments)
}
}
@@ -236,6 +238,28 @@ func (w *Watcher) getDeploysImpl(ws memdb.WatchSet, store *state.StateStore) (in
return deploys, index, nil
}
// removeDeletedDeployments removes any watchers that aren't in the list of
// deployments we got from state
func (w *Watcher) removeDeletedDeployments(deployments []*structs.Deployment) {
w.l.Lock()
defer w.l.Unlock()
// note we can't optimize this by checking the lengths first because some
// deployments might not be active
for _, watcher := range w.watchers {
var found bool
for _, d := range deployments {
if watcher.deploymentID == d.ID {
found = true
break
}
}
if !found {
w.removeByIDLocked(watcher.deploymentID)
}
}
}
// add adds a deployment to the watch list
func (w *Watcher) add(d *structs.Deployment) error {
w.l.Lock()
@@ -287,15 +311,18 @@ func (w *Watcher) addLocked(d *structs.Deployment) (*deploymentWatcher, error) {
func (w *Watcher) remove(d *structs.Deployment) {
w.l.Lock()
defer w.l.Unlock()
w.removeByIDLocked(d.ID)
}
func (w *Watcher) removeByIDLocked(id string) {
// Not enabled so no-op
if !w.enabled {
return
}
if watcher, ok := w.watchers[d.ID]; ok {
if watcher, ok := w.watchers[id]; ok {
watcher.StopWatch()
delete(w.watchers, d.ID)
delete(w.watchers, id)
}
}

View File

@@ -17,6 +17,7 @@ import (
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
"github.com/shoenig/test/must"
"github.com/shoenig/test/wait"
"github.com/stretchr/testify/assert"
mocker "github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
@@ -2081,3 +2082,53 @@ func watchersCount(w *Watcher) int {
return len(w.watchers)
}
// TestWatcher_PurgeDeployment tests that we don't leak watchers if a job is purged
func TestWatcher_PurgeDeployment(t *testing.T) {
ci.Parallel(t)
w, m := defaultTestDeploymentWatcher(t)
// clear UpdateDeploymentStatus default expectation
m.Mock.ExpectedCalls = nil
// Create a job and a deployment
j := mock.Job()
d := mock.Deployment()
d.JobID = j.ID
must.NoError(t, m.state.UpsertJob(structs.MsgTypeTestSetup, m.nextIndex(), nil, j))
must.NoError(t, m.state.UpsertDeployment(m.nextIndex(), d))
// require that we get a call to UpsertDeploymentStatusUpdate
matchConfig := &matchDeploymentStatusUpdateConfig{
DeploymentID: d.ID,
Status: structs.DeploymentStatusPaused,
StatusDescription: structs.DeploymentStatusDescriptionPaused,
}
matcher := matchDeploymentStatusUpdateRequest(matchConfig)
m.On("UpdateDeploymentStatus", mocker.MatchedBy(matcher)).Return(nil)
w.SetEnabled(true, m.state)
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
if watchersCount(w) != 1 {
return fmt.Errorf("expected 1 deployment")
}
return nil
}),
wait.Attempts(100),
wait.Gap(10*time.Millisecond),
))
must.NoError(t, m.state.DeleteJob(m.nextIndex(), j.Namespace, j.ID))
must.Wait(t, wait.InitialSuccess(
wait.ErrorFunc(func() error {
if watchersCount(w) != 0 {
return fmt.Errorf("expected deployment watcher to be stopped")
}
return nil
}),
wait.Attempts(500),
wait.Gap(10*time.Millisecond),
))
}