diff --git a/client/gc_test.go b/client/gc_test.go index b40030368..101b91c47 100644 --- a/client/gc_test.go +++ b/client/gc_test.go @@ -30,7 +30,7 @@ func gcConfig() *GCConfig { // runners to be terminal func exitAllocRunner(runners ...AllocRunner) { for _, ar := range runners { - terminalAlloc := ar.Alloc() + terminalAlloc := ar.Alloc().Copy() terminalAlloc.DesiredStatus = structs.AllocDesiredStatusStop ar.Update(terminalAlloc) } diff --git a/client/pluginmanager/drivermanager/manager.go b/client/pluginmanager/drivermanager/manager.go index 76efd50da..9c8c6af80 100644 --- a/client/pluginmanager/drivermanager/manager.go +++ b/client/pluginmanager/drivermanager/manager.go @@ -242,6 +242,7 @@ func (m *manager) waitForFirstFingerprint(ctx context.Context, cancel context.Ca return } + var mu sync.Mutex var availDrivers []string var wg sync.WaitGroup @@ -253,7 +254,9 @@ func (m *manager) waitForFirstFingerprint(ctx context.Context, cancel context.Ca defer wg.Done() instance.WaitForFirstFingerprint(ctx) if instance.getLastHealth() != drivers.HealthStateUndetected { + mu.Lock() availDrivers = append(availDrivers, name) + mu.Unlock() } }(n, i) } diff --git a/nomad/blocked_evals.go b/nomad/blocked_evals.go index 9f9ca013f..dcb3ef635 100644 --- a/nomad/blocked_evals.go +++ b/nomad/blocked_evals.go @@ -4,7 +4,7 @@ import ( "sync" "time" - "github.com/armon/go-metrics" + metrics "github.com/armon/go-metrics" "github.com/hashicorp/consul/lib" log "github.com/hashicorp/go-hclog" "github.com/hashicorp/nomad/helper" @@ -138,8 +138,8 @@ func (b *BlockedEvals) SetEnabled(enabled bool) { b.l.Unlock() return } else if enabled { - go b.watchCapacity() - go b.prune() + go b.watchCapacity(b.stopCh, b.capacityChangeCh) + go b.prune(b.stopCh) } else { close(b.stopCh) } @@ -475,12 +475,12 @@ func (b *BlockedEvals) UnblockClassAndQuota(class, quota string, index uint64) { // watchCapacity is a long lived function that watches for capacity changes in // nodes and unblocks the correct set of evals. -func (b *BlockedEvals) watchCapacity() { +func (b *BlockedEvals) watchCapacity(stopCh <-chan struct{}, changeCh <-chan *capacityUpdate) { for { select { - case <-b.stopCh: + case <-stopCh: return - case update := <-b.capacityChangeCh: + case update := <-changeCh: b.unblock(update.computedClass, update.quotaChange, update.index) } } @@ -606,6 +606,11 @@ SCAN: b.l.Unlock() return dups } + + // Capture chans inside the lock to prevent a race with them getting + // reset in Flush + dupCh := b.duplicateCh + stopCh := b.stopCh b.l.Unlock() // Create the timer @@ -616,11 +621,11 @@ SCAN: } select { - case <-b.stopCh: + case <-stopCh: return nil case <-timeoutCh: return nil - case <-b.duplicateCh: + case <-dupCh: goto SCAN } } @@ -676,13 +681,13 @@ func (b *BlockedEvals) EmitStats(period time.Duration, stopCh chan struct{}) { } // prune is a long lived function that prunes unnecessary objects on a timer. -func (b *BlockedEvals) prune() { +func (b *BlockedEvals) prune(stopCh <-chan struct{}) { ticker := time.NewTicker(pruneInterval) defer ticker.Stop() for { select { - case <-b.stopCh: + case <-stopCh: return case <-ticker.C: b.pruneUnblockIndexes() diff --git a/nomad/eval_broker.go b/nomad/eval_broker.go index e507a060c..d3dfdc61e 100644 --- a/nomad/eval_broker.go +++ b/nomad/eval_broker.go @@ -10,7 +10,7 @@ import ( "context" - "github.com/armon/go-metrics" + metrics "github.com/armon/go-metrics" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/lib/delayheap" "github.com/hashicorp/nomad/nomad/structs" @@ -167,7 +167,7 @@ func (b *EvalBroker) SetEnabled(enabled bool) { // start the go routine for delayed evals ctx, cancel := context.WithCancel(context.Background()) b.delayedEvalCancelFunc = cancel - go b.runDelayedEvalsWatcher(ctx) + go b.runDelayedEvalsWatcher(ctx, b.delayedEvalsUpdateCh) } b.l.Unlock() if !enabled { @@ -741,7 +741,7 @@ func (d *evalWrapper) Namespace() string { // runDelayedEvalsWatcher is a long-lived function that waits till a time deadline is met for // pending evaluations before enqueuing them -func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context) { +func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context, updateCh <-chan struct{}) { var timerChannel <-chan time.Time var delayTimer *time.Timer for { @@ -768,7 +768,7 @@ func (b *EvalBroker) runDelayedEvalsWatcher(ctx context.Context) { b.stats.TotalWaiting -= 1 b.enqueueLocked(eval, eval.Type) b.l.Unlock() - case <-b.delayedEvalsUpdateCh: + case <-updateCh: continue } } diff --git a/nomad/leader_test.go b/nomad/leader_test.go index dd02761e5..5f8b81526 100644 --- a/nomad/leader_test.go +++ b/nomad/leader_test.go @@ -399,6 +399,8 @@ func TestLeader_PeriodicDispatcher_Restore_Adds(t *testing.T) { // Check that the new leader is tracking the periodic job only testutil.WaitForResult(func() (bool, error) { + leader.periodicDispatcher.l.Lock() + defer leader.periodicDispatcher.l.Unlock() if _, tracked := leader.periodicDispatcher.tracked[tuplePeriodic]; !tracked { return false, fmt.Errorf("periodic job not tracked") } diff --git a/nomad/periodic.go b/nomad/periodic.go index edd295461..26c2a6715 100644 --- a/nomad/periodic.go +++ b/nomad/periodic.go @@ -173,7 +173,7 @@ func (p *PeriodicDispatch) SetEnabled(enabled bool) { // If we are transitioning from disabled to enabled, run the daemon. ctx, cancel := context.WithCancel(context.Background()) p.stopFn = cancel - go p.run(ctx) + go p.run(ctx, p.updateCh) } } @@ -320,7 +320,7 @@ func (p *PeriodicDispatch) shouldRun() bool { // run is a long-lived function that waits till a job's periodic spec is met and // then creates an evaluation to run the job. -func (p *PeriodicDispatch) run(ctx context.Context) { +func (p *PeriodicDispatch) run(ctx context.Context, updateCh <-chan struct{}) { var launchCh <-chan time.Time for p.shouldRun() { job, launch := p.nextLaunch() @@ -335,7 +335,7 @@ func (p *PeriodicDispatch) run(ctx context.Context) { select { case <-ctx.Done(): return - case <-p.updateCh: + case <-updateCh: continue case <-launchCh: p.dispatch(job, launch) diff --git a/nomad/periodic_endpoint_test.go b/nomad/periodic_endpoint_test.go index b16b4eb64..db50451b9 100644 --- a/nomad/periodic_endpoint_test.go +++ b/nomad/periodic_endpoint_test.go @@ -4,7 +4,7 @@ import ( "testing" memdb "github.com/hashicorp/go-memdb" - "github.com/hashicorp/net-rpc-msgpackrpc" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" "github.com/hashicorp/nomad/acl" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" @@ -120,8 +120,9 @@ func TestPeriodicEndpoint_Force_ACL(t *testing.T) { ws := memdb.NewWatchSet() eval, err := state.EvalByID(ws, resp.EvalID) assert.Nil(err) - assert.NotNil(eval) - assert.Equal(eval.CreateIndex, resp.EvalCreateIndex) + if assert.NotNil(eval) { + assert.Equal(eval.CreateIndex, resp.EvalCreateIndex) + } } // Fetch the response with management token @@ -135,8 +136,9 @@ func TestPeriodicEndpoint_Force_ACL(t *testing.T) { ws := memdb.NewWatchSet() eval, err := state.EvalByID(ws, resp.EvalID) assert.Nil(err) - assert.NotNil(eval) - assert.Equal(eval.CreateIndex, resp.EvalCreateIndex) + if assert.NotNil(eval) { + assert.Equal(eval.CreateIndex, resp.EvalCreateIndex) + } } }