diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 9f9ca013f..dcb3ef635 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -4,7 +4,7 @@ import ( "sync" "time" - "github.com/armon/go-metrics" + metrics "github.com/armon/go-metrics" "github.com/hashicorp/consul/lib" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/helper" @@ -138,8 +138,8 @@ func (b *BlockedEvals) SetEnabled(enabled bool) { b.l.Unlock() return } else if enabled { - go b.watchCapacity() - go b.prune() + go b.watchCapacity(b.stopCh, b.capacityChangeCh) + go b.prune(b.stopCh) } else { close(b.stopCh) } @@ -475,12 +475,12 @@ func (b *BlockedEvals) UnblockClassAndQuota(class, quota string, index uint64) { // watchCapacity is a long lived function that watches for capacity changes in // nodes and unblocks the correct set of evals. -func (b *BlockedEvals) watchCapacity() { +func (b *BlockedEvals) watchCapacity(stopCh <-chan struct{}, changeCh <-chan *capacityUpdate) { for { select { - case <-b.stopCh: + case <-stopCh: return - case update := <-b.capacityChangeCh: + case update := <-changeCh: b.unblock(update.computedClass, update.quotaChange, update.index) } } @@ -606,6 +606,11 @@ SCAN: b.l.Unlock() return dups } + + // Capture chans inside the lock to prevent a race with them getting + // reset in Flush + dupCh := b.duplicateCh + stopCh := b.stopCh b.l.Unlock() // Create the timer @@ -616,11 +621,11 @@ SCAN: } select { - case <-b.stopCh: + case <-stopCh: return nil case <-timeoutCh: return nil - case <-b.duplicateCh: + case <-dupCh: goto SCAN } } @@ -676,13 +681,13 @@ func (b *BlockedEvals) EmitStats(period time.Duration, stopCh chan struct{}) { } // prune is a long lived function that prunes unnecessary objects on a timer. -func (b *BlockedEvals) prune() { +func (b *BlockedEvals) prune(stopCh <-chan struct{}) { ticker := time.NewTicker(pruneInterval) defer ticker.Stop() for { select { - case <-b.stopCh: + case <-stopCh: return case <-ticker.C: b.pruneUnblockIndexes()