From 800bd5733388ab9bfe8030392911e4d9c04f803d Mon Sep 17 00:00:00 2001 From: Danielle Tomlinson Date: Fri, 14 Dec 2018 16:02:47 +0100 Subject: [PATCH 1/8] allocrunner: Async shutdown and destroy This commit reduces the locking required to shutdown or destroy allocrunners, and allows parallel shutdown and destroy of allocrunners during shutdown. --- client/allocrunner/alloc_runner.go | 147 ++++++++++++++++++------ client/allocrunner/alloc_runner_test.go | 2 +- 2 files changed, 114 insertions(+), 35 deletions(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 3a5ab0f6d..9d852beaa 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -64,12 +64,32 @@ type allocRunner struct { // to access. destroyed bool + // destroyCh is closed when the Run() loop has exited, postrun hooks have + // run, and alloc runner has been destroyed. + destroyCh chan struct{} + + // shutdown is true when the Run() loop has exited, and shutdown hooks have + // run. Must acquire destroyedLock to access. + shutdown bool + + // shutdownCh is closed when the Run() loop has exited, and shutdown hooks + // have run. + shutdownCh chan struct{} + // runnersLaunched is true if TaskRunners were Run. Must acquire // destroyedLock to access. runnersLaunched bool - // destroyedLock guards destroyed, runnersLaunched, and serializes - // Shutdown/Destroy calls. + // destroyLaunched is true if Destroy has been called. Must acquire + // destroyedLock to access. + destroyLaunched bool + + // shutdownLaunched is true if Shutdown has been called. Must acquire + // destroyedLock to access. + shutdownLaunched bool + + // destroyedLock guards destroyed, runnersLaunched, destroyLaunched, + // shutdownLaunched, and serializes Shutdown/Destroy calls. destroyedLock sync.Mutex // Alloc captures the allocation being run. @@ -130,6 +150,8 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { vaultClient: config.Vault, tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)), waitCh: make(chan struct{}), + destroyCh: make(chan struct{}), + shutdownCh: make(chan struct{}), state: &state.State{}, stateDB: config.StateDB, stateUpdater: config.StateUpdater, @@ -615,19 +637,7 @@ func (ar *allocRunner) Listener() *cstructs.AllocListener { return ar.allocBroadcaster.Listen() } -// Destroy the alloc runner by synchronously stopping it if it is still running -// and cleaning up all of its resources. -// -// This method is safe for calling concurrently with Run() and will cause it to -// exit (thus closing WaitCh). -func (ar *allocRunner) Destroy() { - ar.destroyedLock.Lock() - defer ar.destroyedLock.Unlock() - if ar.destroyed { - // Only destroy once - return - } - +func (ar *allocRunner) destroyImpl() { // Stop any running tasks and persist states in case the client is // shutdown before Destroy finishes. states := ar.killTasks() @@ -652,7 +662,52 @@ func (ar *allocRunner) Destroy() { } // Mark alloc as destroyed + ar.destroyedLock.Lock() + + if !ar.shutdown { + ar.shutdown = true + close(ar.shutdownCh) + } + ar.destroyed = true + close(ar.destroyCh) + + ar.destroyedLock.Unlock() +} + +// Destroy the alloc runner by stopping it if it is still running and cleaning +// up all of its resources. +// +// This method is safe for calling concurrently with Run() and will cause it to +// exit (thus closing WaitCh). +func (ar *allocRunner) Destroy() { + ar.destroyedLock.Lock() + defer ar.destroyedLock.Unlock() + + if ar.destroyed { + // Only destroy once + return + } + + if ar.destroyLaunched { + // Only dispatch a destroy once + return + } + + ar.destroyLaunched = true + + // Synchronize calls to shutdown/destroy + if ar.shutdownLaunched { + go func() { + ar.logger.Debug("Waiting for shutdown before destroying runner") + <-ar.shutdownCh + ar.destroyImpl() + }() + + return + } + + go ar.destroyImpl() } // IsDestroyed returns true if the alloc runner has been destroyed (stopped and @@ -675,6 +730,14 @@ func (ar *allocRunner) IsWaiting() bool { return ar.prevAllocWatcher.IsWaiting() } +func (ar *allocRunner) DestroyCh() <-chan struct{} { + return ar.destroyCh +} + +func (ar *allocRunner) ShutdownCh() <-chan struct{} { + return ar.shutdownCh +} + // Shutdown AllocRunner gracefully. Blocks while shutting down all TaskRunners. // Tasks are unaffected and may be restored. func (ar *allocRunner) Shutdown() { @@ -687,29 +750,45 @@ func (ar *allocRunner) Shutdown() { return } - ar.logger.Trace("shutting down") - - // Shutdown tasks gracefully if they were run - if ar.runnersLaunched { - wg := sync.WaitGroup{} - for _, tr := range ar.tasks { - wg.Add(1) - go func(tr *taskrunner.TaskRunner) { - tr.Shutdown() - wg.Done() - }(tr) - } - wg.Wait() + // Destroy is a superset of Shutdown so if it's been marked for destruction, + // don't try and shutdown in parallel. If shutdown has been launched, don't + // try again. + if ar.destroyLaunched || ar.shutdownLaunched { + return } - // Wait for Run to exit - <-ar.waitCh + ar.shutdownLaunched = true - // Run shutdown hooks - ar.shutdownHooks() + go func() { + ar.logger.Trace("shutting down") - // Wait for updater to finish its final run - <-ar.taskStateUpdateHandlerCh + // Shutdown tasks gracefully if they were run + if ar.runnersLaunched { + wg := sync.WaitGroup{} + for _, tr := range ar.tasks { + wg.Add(1) + go func(tr *taskrunner.TaskRunner) { + tr.Shutdown() + wg.Done() + }(tr) + } + wg.Wait() + } + + // Wait for Run to exit + <-ar.waitCh + + // Run shutdown hooks + ar.shutdownHooks() + + // Wait for updater to finish its final run + <-ar.taskStateUpdateHandlerCh + + ar.destroyedLock.Lock() + ar.shutdown = true + close(ar.shutdownCh) + ar.destroyedLock.Unlock() + }() } // IsMigrating returns true if the alloc runner is migrating data from its diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 3c1464560..55fa0ea17 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -279,7 +279,7 @@ func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) { ar2.Destroy() select { - case <-ar2.WaitCh(): + case <-ar2.DestroyCh(): // exited as expected case <-time.After(10 * time.Second): t.Fatalf("timed out waiting for AR to GC") From 934d2e6bf6e14f80d5b6ab46dc36cc3a73be1be4 Mon Sep 17 00:00:00 2001 From: Danielle Tomlinson Date: Fri, 14 Dec 2018 16:04:58 +0100 Subject: [PATCH 2/8] client: Async API for shutdown/destroy allocrunners --- client/client.go | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/client/client.go b/client/client.go index d7b39b077..882b7d42e 100644 --- a/client/client.go +++ b/client/client.go @@ -119,6 +119,8 @@ type AllocRunner interface { StatsReporter() interfaces.AllocStatsReporter Update(*structs.Allocation) WaitCh() <-chan struct{} + DestroyCh() <-chan struct{} + ShutdownCh() <-chan struct{} } // Client is used to implement the client interaction with Nomad. Clients @@ -567,22 +569,21 @@ func (c *Client) Shutdown() error { // Stop Garbage collector c.garbageCollector.Stop() + arGroup := group{} if c.config.DevMode { // In DevMode destroy all the running allocations. for _, ar := range c.getAllocRunners() { ar.Destroy() - } - for _, ar := range c.getAllocRunners() { - <-ar.WaitCh() + arGroup.AddCh(ar.DestroyCh()) } } else { // In normal mode call shutdown - arGroup := group{} for _, ar := range c.getAllocRunners() { - arGroup.Go(ar.Shutdown) + ar.Shutdown() + arGroup.AddCh(ar.ShutdownCh()) } - arGroup.Wait() } + arGroup.Wait() c.shutdown = true close(c.shutdownCh) @@ -2682,6 +2683,12 @@ func (g *group) Go(f func()) { }() } +func (c *group) AddCh(ch <-chan struct{}) { + c.Go(func() { + <-ch + }) +} + // Wait for all goroutines to exit. Must be called after all calls to Go // complete. func (g *group) Wait() { From 6f636ea15aa741d7b4d94aeffc6cf680d6382d4d Mon Sep 17 00:00:00 2001 From: Danielle Tomlinson Date: Fri, 14 Dec 2018 16:05:16 +0100 Subject: [PATCH 3/8] gc: Wait for allocrunners to be destroyed --- client/gc.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/client/gc.go b/client/gc.go index c08c43bdb..2ec640d08 100644 --- a/client/gc.go +++ b/client/gc.go @@ -183,7 +183,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(allocID string, ar AllocRunne ar.Destroy() select { - case <-ar.WaitCh(): + case <-ar.DestroyCh(): case <-a.shutdownCh: } From 69fc73767a31ed737db3b83e5cccaacedb3f3174 Mon Sep 17 00:00:00 2001 From: Danielle Tomlinson Date: Mon, 17 Dec 2018 13:27:54 +0100 Subject: [PATCH 4/8] allocrunner: Handle updates asynchronously This creates a new buffered channel and goroutine on the allocrunner for serializing updates to allocations. This allows us to take updates off the routine that is used from processing updates from the server, without having complicated machinery for tracking update lifetimes, or other external synchronization. This results in a nice performance improvement and signficantly better throughput on batch changes such as preempting a large number of jobs for a larger placement. --- client/allocrunner/alloc_runner.go | 48 +++++++++++++++++++++++-- client/allocrunner/alloc_runner_test.go | 42 ++++++++++++++++++++++ 2 files changed, 87 insertions(+), 3 deletions(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 9d852beaa..8c1550994 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -49,6 +49,12 @@ type allocRunner struct { // before this goroutine exits. taskStateUpdateHandlerCh chan struct{} + // allocUpdatedCh is a channel that is used to stream allocation updates into + // the allocUpdate handler. Must have len==1 to allow nonblocking notification + // of new allocation updates while the goroutine is processing a previous + // update. + allocUpdatedCh chan *structs.Allocation + // consulClient is the client used by the consul service hook for // registering services and checks consulClient consul.ConsulServiceAPI @@ -157,6 +163,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { stateUpdater: config.StateUpdater, taskStateUpdatedCh: make(chan struct{}, 1), taskStateUpdateHandlerCh: make(chan struct{}), + allocUpdatedCh: make(chan *structs.Allocation, 1), deviceStatsReporter: config.DeviceStatsReporter, prevAllocWatcher: config.PrevAllocWatcher, prevAllocMigrator: config.PrevAllocMigrator, @@ -226,6 +233,9 @@ func (ar *allocRunner) Run() { // Start the task state update handler go ar.handleTaskStateUpdates() + // Start the alloc update handler + go ar.handleAllocUpdates() + // If an alloc should not be run, ensure any restored task handles are // destroyed and exit to wait for the AR to be GC'd by the client. if !ar.shouldRun() { @@ -601,12 +611,44 @@ func (ar *allocRunner) AllocState() *state.State { return state } -// Update the running allocation with a new version received from the server. -// +// Update asyncronously updates the running allocation with a new version +// received from the server. +// When processing a new update, we will first attempt to drain stale updates +// from the queue, before appending the new one. +func (ar *allocRunner) Update(update *structs.Allocation) { + select { + // Drain queued update from the channel if possible, and check the modify + // index + case oldUpdate := <-ar.allocUpdatedCh: + // If the old update is newer than the replacement, then skip the new one + // and return. This case shouldn't happen, but may in the case of a bug + // elsewhere inside the system. + if oldUpdate.AllocModifyIndex > update.AllocModifyIndex { + ar.allocUpdatedCh <- oldUpdate + return + } + default: + } + + // Queue the new update + ar.allocUpdatedCh <- update +} + +func (ar *allocRunner) handleAllocUpdates() { + for { + select { + case update := <-ar.allocUpdatedCh: + ar.handleAllocUpdate(update) + case <-ar.waitCh: + break + } + } +} + // This method sends the updated alloc to Run for serially processing updates. // If there is already a pending update it will be discarded and replaced by // the latest update. -func (ar *allocRunner) Update(update *structs.Allocation) { +func (ar *allocRunner) handleAllocUpdate(update *structs.Allocation) { // Detect Stop updates stopping := !ar.Alloc().TerminalStatus() && update.TerminalStatus() diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 55fa0ea17..eeedef59f 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -286,6 +286,48 @@ func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) { } } +func TestAllocRunner_Update_Semantics(t *testing.T) { + t.Parallel() + require := require.New(t) + + updatedAlloc := func(a *structs.Allocation) *structs.Allocation { + upd := a.CopySkipJob() + upd.AllocModifyIndex++ + + return upd + } + + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + + ar, err := NewAllocRunner(conf) + require.NoError(err) + + upd1 := updatedAlloc(alloc) + ar.Update(upd1) + + // Update was placed into a queue + require.Len(ar.allocUpdatedCh, 1) + + upd2 := updatedAlloc(alloc) + ar.Update(upd2) + + // Allocation was _replaced_ + + require.Len(ar.allocUpdatedCh, 1) + queuedAlloc := <-ar.allocUpdatedCh + require.Equal(upd2, queuedAlloc) + + // Requeueing older alloc is skipped + ar.Update(upd2) + ar.Update(upd1) + + queuedAlloc = <-ar.allocUpdatedCh + require.Equal(upd2, queuedAlloc) +} + /* import ( From 9f1b53f2a8e39ed6cf26ab22ebd06fdc6bfde663 Mon Sep 17 00:00:00 2001 From: Danielle Tomlinson Date: Mon, 17 Dec 2018 15:55:36 +0100 Subject: [PATCH 5/8] fixup: Log when we detect out of order updates --- client/allocrunner/alloc_runner.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 8c1550994..8e1b7ff5e 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -624,6 +624,9 @@ func (ar *allocRunner) Update(update *structs.Allocation) { // and return. This case shouldn't happen, but may in the case of a bug // elsewhere inside the system. if oldUpdate.AllocModifyIndex > update.AllocModifyIndex { + ar.logger.Warn("Discarding allocation update due to newer alloc revision in queue", + "old_modify_index", oldUpdate.AllocModifyIndex, + "new_modify_index", update.AllocModifyIndex) ar.allocUpdatedCh <- oldUpdate return } From 5464a9565ac48203a49554a8d71b63424bb5227a Mon Sep 17 00:00:00 2001 From: Danielle Tomlinson Date: Tue, 18 Dec 2018 23:28:48 +0100 Subject: [PATCH 6/8] allocrunner: Documentation for ShutdownCh/DestroyCh --- client/allocrunner/alloc_runner.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 8e1b7ff5e..419b199c6 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -725,6 +725,7 @@ func (ar *allocRunner) destroyImpl() { // // This method is safe for calling concurrently with Run() and will cause it to // exit (thus closing WaitCh). +// When the destroy action is completed, it will close DestroyCh(). func (ar *allocRunner) Destroy() { ar.destroyedLock.Lock() defer ar.destroyedLock.Unlock() @@ -775,16 +776,21 @@ func (ar *allocRunner) IsWaiting() bool { return ar.prevAllocWatcher.IsWaiting() } +// DestroyCh is a channel that is closed when an allocrunner is closed due to +// an explicit call to Destroy(). func (ar *allocRunner) DestroyCh() <-chan struct{} { return ar.destroyCh } +// ShutdownCh is a channel that is closed when an allocrunner is closed due to +// either an explicit call to Shutdown(), or Destroy(). func (ar *allocRunner) ShutdownCh() <-chan struct{} { return ar.shutdownCh } -// Shutdown AllocRunner gracefully. Blocks while shutting down all TaskRunners. +// Shutdown AllocRunner gracefully. Asynchronously shuts down all TaskRunners. // Tasks are unaffected and may be restored. +// When the destroy action is completed, it will close ShutdownCh(). func (ar *allocRunner) Shutdown() { ar.destroyedLock.Lock() defer ar.destroyedLock.Unlock() From 502f36335e7b321efbd408f504db0003c4875ddb Mon Sep 17 00:00:00 2001 From: Danielle Tomlinson Date: Tue, 18 Dec 2018 23:36:29 +0100 Subject: [PATCH 7/8] allocrunner: Drop and log updates after closing waitCh --- client/allocrunner/alloc_runner.go | 8 ++++++++ client/allocrunner/alloc_runner_test.go | 9 +++++++++ 2 files changed, 17 insertions(+) diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 419b199c6..80fc1d6a1 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -629,7 +629,15 @@ func (ar *allocRunner) Update(update *structs.Allocation) { "new_modify_index", update.AllocModifyIndex) ar.allocUpdatedCh <- oldUpdate return + } else { + ar.logger.Trace("Discarding allocation update", + "skipped_modify_index", oldUpdate.AllocModifyIndex, + "new_modify_index", update.AllocModifyIndex) } + case <-ar.waitCh: + ar.logger.Trace("AllocRunner has terminated, skipping alloc update", + "modify_index", update.AllocModifyIndex) + return default: } diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index eeedef59f..e626fbd1b 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -326,6 +326,15 @@ func TestAllocRunner_Update_Semantics(t *testing.T) { queuedAlloc = <-ar.allocUpdatedCh require.Equal(upd2, queuedAlloc) + + // Ignore after watch closed + + close(ar.waitCh) + + ar.Update(upd1) + + // Did not queue the update + require.Len(ar.allocUpdatedCh, 0) } /* From f619db297f0299e882f6b44381a06b0790f658d3 Mon Sep 17 00:00:00 2001 From: Danielle Tomlinson Date: Tue, 18 Dec 2018 23:38:28 +0100 Subject: [PATCH 8/8] client: Update tests for async destroy --- client/client_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/client/client_test.go b/client/client_test.go index e5e68ed1c..f81cfe93f 100644 --- a/client/client_test.go +++ b/client/client_test.go @@ -642,7 +642,7 @@ func TestClient_SaveRestoreState(t *testing.T) { } for _, ar := range c2.getAllocRunners() { - <-ar.WaitCh() + <-ar.DestroyCh() } } @@ -780,7 +780,7 @@ func TestClient_BlockedAllocations(t *testing.T) { } for _, ar := range c1.getAllocRunners() { - <-ar.WaitCh() + <-ar.DestroyCh() } }