diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 3a5ab0f6d..80fc1d6a1 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -49,6 +49,12 @@ type allocRunner struct { // before this goroutine exits. taskStateUpdateHandlerCh chan struct{} + // allocUpdatedCh is a channel that is used to stream allocation updates into + // the allocUpdate handler. Must have len==1 to allow nonblocking notification + // of new allocation updates while the goroutine is processing a previous + // update. + allocUpdatedCh chan *structs.Allocation + // consulClient is the client used by the consul service hook for // registering services and checks consulClient consul.ConsulServiceAPI @@ -64,12 +70,32 @@ type allocRunner struct { // to access. destroyed bool + // destroyCh is closed when the Run() loop has exited, postrun hooks have + // run, and alloc runner has been destroyed. + destroyCh chan struct{} + + // shutdown is true when the Run() loop has exited, and shutdown hooks have + // run. Must acquire destroyedLock to access. + shutdown bool + + // shutdownCh is closed when the Run() loop has exited, and shutdown hooks + // have run. + shutdownCh chan struct{} + // runnersLaunched is true if TaskRunners were Run. Must acquire // destroyedLock to access. runnersLaunched bool - // destroyedLock guards destroyed, runnersLaunched, and serializes - // Shutdown/Destroy calls. + // destroyLaunched is true if Destroy has been called. Must acquire + // destroyedLock to access. + destroyLaunched bool + + // shutdownLaunched is true if Shutdown has been called. Must acquire + // destroyedLock to access. + shutdownLaunched bool + + // destroyedLock guards destroyed, runnersLaunched, destroyLaunched, + // shutdownLaunched, and serializes Shutdown/Destroy calls. destroyedLock sync.Mutex // Alloc captures the allocation being run. @@ -130,11 +156,14 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { vaultClient: config.Vault, tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)), waitCh: make(chan struct{}), + destroyCh: make(chan struct{}), + shutdownCh: make(chan struct{}), state: &state.State{}, stateDB: config.StateDB, stateUpdater: config.StateUpdater, taskStateUpdatedCh: make(chan struct{}, 1), taskStateUpdateHandlerCh: make(chan struct{}), + allocUpdatedCh: make(chan *structs.Allocation, 1), deviceStatsReporter: config.DeviceStatsReporter, prevAllocWatcher: config.PrevAllocWatcher, prevAllocMigrator: config.PrevAllocMigrator, @@ -204,6 +233,9 @@ func (ar *allocRunner) Run() { // Start the task state update handler go ar.handleTaskStateUpdates() + // Start the alloc update handler + go ar.handleAllocUpdates() + // If an alloc should not be run, ensure any restored task handles are // destroyed and exit to wait for the AR to be GC'd by the client. if !ar.shouldRun() { @@ -579,12 +611,55 @@ func (ar *allocRunner) AllocState() *state.State { return state } -// Update the running allocation with a new version received from the server. -// +// Update asyncronously updates the running allocation with a new version +// received from the server. +// When processing a new update, we will first attempt to drain stale updates +// from the queue, before appending the new one. +func (ar *allocRunner) Update(update *structs.Allocation) { + select { + // Drain queued update from the channel if possible, and check the modify + // index + case oldUpdate := <-ar.allocUpdatedCh: + // If the old update is newer than the replacement, then skip the new one + // and return. This case shouldn't happen, but may in the case of a bug + // elsewhere inside the system. + if oldUpdate.AllocModifyIndex > update.AllocModifyIndex { + ar.logger.Warn("Discarding allocation update due to newer alloc revision in queue", + "old_modify_index", oldUpdate.AllocModifyIndex, + "new_modify_index", update.AllocModifyIndex) + ar.allocUpdatedCh <- oldUpdate + return + } else { + ar.logger.Trace("Discarding allocation update", + "skipped_modify_index", oldUpdate.AllocModifyIndex, + "new_modify_index", update.AllocModifyIndex) + } + case <-ar.waitCh: + ar.logger.Trace("AllocRunner has terminated, skipping alloc update", + "modify_index", update.AllocModifyIndex) + return + default: + } + + // Queue the new update + ar.allocUpdatedCh <- update +} + +func (ar *allocRunner) handleAllocUpdates() { + for { + select { + case update := <-ar.allocUpdatedCh: + ar.handleAllocUpdate(update) + case <-ar.waitCh: + break + } + } +} + // This method sends the updated alloc to Run for serially processing updates. // If there is already a pending update it will be discarded and replaced by // the latest update. -func (ar *allocRunner) Update(update *structs.Allocation) { +func (ar *allocRunner) handleAllocUpdate(update *structs.Allocation) { // Detect Stop updates stopping := !ar.Alloc().TerminalStatus() && update.TerminalStatus() @@ -615,19 +690,7 @@ func (ar *allocRunner) Listener() *cstructs.AllocListener { return ar.allocBroadcaster.Listen() } -// Destroy the alloc runner by synchronously stopping it if it is still running -// and cleaning up all of its resources. -// -// This method is safe for calling concurrently with Run() and will cause it to -// exit (thus closing WaitCh). -func (ar *allocRunner) Destroy() { - ar.destroyedLock.Lock() - defer ar.destroyedLock.Unlock() - if ar.destroyed { - // Only destroy once - return - } - +func (ar *allocRunner) destroyImpl() { // Stop any running tasks and persist states in case the client is // shutdown before Destroy finishes. states := ar.killTasks() @@ -652,7 +715,53 @@ func (ar *allocRunner) Destroy() { } // Mark alloc as destroyed + ar.destroyedLock.Lock() + + if !ar.shutdown { + ar.shutdown = true + close(ar.shutdownCh) + } + ar.destroyed = true + close(ar.destroyCh) + + ar.destroyedLock.Unlock() +} + +// Destroy the alloc runner by stopping it if it is still running and cleaning +// up all of its resources. +// +// This method is safe for calling concurrently with Run() and will cause it to +// exit (thus closing WaitCh). +// When the destroy action is completed, it will close DestroyCh(). +func (ar *allocRunner) Destroy() { + ar.destroyedLock.Lock() + defer ar.destroyedLock.Unlock() + + if ar.destroyed { + // Only destroy once + return + } + + if ar.destroyLaunched { + // Only dispatch a destroy once + return + } + + ar.destroyLaunched = true + + // Synchronize calls to shutdown/destroy + if ar.shutdownLaunched { + go func() { + ar.logger.Debug("Waiting for shutdown before destroying runner") + <-ar.shutdownCh + ar.destroyImpl() + }() + + return + } + + go ar.destroyImpl() } // IsDestroyed returns true if the alloc runner has been destroyed (stopped and @@ -675,8 +784,21 @@ func (ar *allocRunner) IsWaiting() bool { return ar.prevAllocWatcher.IsWaiting() } -// Shutdown AllocRunner gracefully. Blocks while shutting down all TaskRunners. +// DestroyCh is a channel that is closed when an allocrunner is closed due to +// an explicit call to Destroy(). +func (ar *allocRunner) DestroyCh() <-chan struct{} { + return ar.destroyCh +} + +// ShutdownCh is a channel that is closed when an allocrunner is closed due to +// either an explicit call to Shutdown(), or Destroy(). +func (ar *allocRunner) ShutdownCh() <-chan struct{} { + return ar.shutdownCh +} + +// Shutdown AllocRunner gracefully. Asynchronously shuts down all TaskRunners. // Tasks are unaffected and may be restored. +// When the destroy action is completed, it will close ShutdownCh(). func (ar *allocRunner) Shutdown() { ar.destroyedLock.Lock() defer ar.destroyedLock.Unlock() @@ -687,29 +809,45 @@ func (ar *allocRunner) Shutdown() { return } - ar.logger.Trace("shutting down") - - // Shutdown tasks gracefully if they were run - if ar.runnersLaunched { - wg := sync.WaitGroup{} - for _, tr := range ar.tasks { - wg.Add(1) - go func(tr *taskrunner.TaskRunner) { - tr.Shutdown() - wg.Done() - }(tr) - } - wg.Wait() + // Destroy is a superset of Shutdown so if it's been marked for destruction, + // don't try and shutdown in parallel. If shutdown has been launched, don't + // try again. + if ar.destroyLaunched || ar.shutdownLaunched { + return } - // Wait for Run to exit - <-ar.waitCh + ar.shutdownLaunched = true - // Run shutdown hooks - ar.shutdownHooks() + go func() { + ar.logger.Trace("shutting down") - // Wait for updater to finish its final run - <-ar.taskStateUpdateHandlerCh + // Shutdown tasks gracefully if they were run + if ar.runnersLaunched { + wg := sync.WaitGroup{} + for _, tr := range ar.tasks { + wg.Add(1) + go func(tr *taskrunner.TaskRunner) { + tr.Shutdown() + wg.Done() + }(tr) + } + wg.Wait() + } + + // Wait for Run to exit + <-ar.waitCh + + // Run shutdown hooks + ar.shutdownHooks() + + // Wait for updater to finish its final run + <-ar.taskStateUpdateHandlerCh + + ar.destroyedLock.Lock() + ar.shutdown = true + close(ar.shutdownCh) + ar.destroyedLock.Unlock() + }() } // IsMigrating returns true if the alloc runner is migrating data from its diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 3c1464560..e626fbd1b 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -279,13 +279,64 @@ func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) { ar2.Destroy() select { - case <-ar2.WaitCh(): + case <-ar2.DestroyCh(): // exited as expected case <-time.After(10 * time.Second): t.Fatalf("timed out waiting for AR to GC") } } +func TestAllocRunner_Update_Semantics(t *testing.T) { + t.Parallel() + require := require.New(t) + + updatedAlloc := func(a *structs.Allocation) *structs.Allocation { + upd := a.CopySkipJob() + upd.AllocModifyIndex++ + + return upd + } + + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + + ar, err := NewAllocRunner(conf) + require.NoError(err) + + upd1 := updatedAlloc(alloc) + ar.Update(upd1) + + // Update was placed into a queue + require.Len(ar.allocUpdatedCh, 1) + + upd2 := updatedAlloc(alloc) + ar.Update(upd2) + + // Allocation was _replaced_ + + require.Len(ar.allocUpdatedCh, 1) + queuedAlloc := <-ar.allocUpdatedCh + require.Equal(upd2, queuedAlloc) + + // Requeueing older alloc is skipped + ar.Update(upd2) + ar.Update(upd1) + + queuedAlloc = <-ar.allocUpdatedCh + require.Equal(upd2, queuedAlloc) + + // Ignore after watch closed + + close(ar.waitCh) + + ar.Update(upd1) + + // Did not queue the update + require.Len(ar.allocUpdatedCh, 0) +} + /* import ( diff --git a/client/client.go b/client/client.go index d7b39b077..882b7d42e 100644 --- a/client/client.go +++ b/client/client.go @@ -119,6 +119,8 @@ type AllocRunner interface { StatsReporter() interfaces.AllocStatsReporter Update(*structs.Allocation) WaitCh() <-chan struct{} + DestroyCh() <-chan struct{} + ShutdownCh() <-chan struct{} } // Client is used to implement the client interaction with Nomad. Clients @@ -567,22 +569,21 @@ func (c *Client) Shutdown() error { // Stop Garbage collector c.garbageCollector.Stop() + arGroup := group{} if c.config.DevMode { // In DevMode destroy all the running allocations. for _, ar := range c.getAllocRunners() { ar.Destroy() - } - for _, ar := range c.getAllocRunners() { - <-ar.WaitCh() + arGroup.AddCh(ar.DestroyCh()) } } else { // In normal mode call shutdown - arGroup := group{} for _, ar := range c.getAllocRunners() { - arGroup.Go(ar.Shutdown) + ar.Shutdown() + arGroup.AddCh(ar.ShutdownCh()) } - arGroup.Wait() } + arGroup.Wait() c.shutdown = true close(c.shutdownCh) @@ -2682,6 +2683,12 @@ func (g *group) Go(f func()) { }() } +func (c *group) AddCh(ch <-chan struct{}) { + c.Go(func() { + <-ch + }) +} + // Wait for all goroutines to exit. Must be called after all calls to Go // complete. func (g *group) Wait() { diff --git a/client/client_test.go b/client/client_test.go index e5e68ed1c..f81cfe93f 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -642,7 +642,7 @@ func TestClient_SaveRestoreState(t *testing.T) { } for _, ar := range c2.getAllocRunners() { - <-ar.WaitCh() + <-ar.DestroyCh() } } @@ -780,7 +780,7 @@ func TestClient_BlockedAllocations(t *testing.T) { } for _, ar := range c1.getAllocRunners() { - <-ar.WaitCh() + <-ar.DestroyCh() } } diff --git a/client/gc.go b/client/gc.go index c08c43bdb..2ec640d08 100644 --- a/client/gc.go +++ b/client/gc.go @@ -183,7 +183,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(allocID string, ar AllocRunne ar.Destroy() select { - case <-ar.WaitCh(): + case <-ar.DestroyCh(): case <-a.shutdownCh: }