Merge pull request #5007 from hashicorp/dani/f-allocrunner-async

allocrunner: Async api for shutdown/destroy/update
This commit is contained in:
Danielle Tomlinson
2018-12-19 01:26:41 +01:00
committed by GitHub
5 changed files with 244 additions and 48 deletions

View File

@@ -49,6 +49,12 @@ type allocRunner struct {
// before this goroutine exits.
taskStateUpdateHandlerCh chan struct{}
// allocUpdatedCh is a channel that is used to stream allocation updates into
// the allocUpdate handler. Must have len==1 to allow nonblocking notification
// of new allocation updates while the goroutine is processing a previous
// update.
allocUpdatedCh chan *structs.Allocation
// consulClient is the client used by the consul service hook for
// registering services and checks
consulClient consul.ConsulServiceAPI
@@ -64,12 +70,32 @@ type allocRunner struct {
// to access.
destroyed bool
// destroyCh is closed when the Run() loop has exited, postrun hooks have
// run, and alloc runner has been destroyed.
destroyCh chan struct{}
// shutdown is true when the Run() loop has exited, and shutdown hooks have
// run. Must acquire destroyedLock to access.
shutdown bool
// shutdownCh is closed when the Run() loop has exited, and shutdown hooks
// have run.
shutdownCh chan struct{}
// runnersLaunched is true if TaskRunners were Run. Must acquire
// destroyedLock to access.
runnersLaunched bool
// destroyedLock guards destroyed, runnersLaunched, and serializes
// Shutdown/Destroy calls.
// destroyLaunched is true if Destroy has been called. Must acquire
// destroyedLock to access.
destroyLaunched bool
// shutdownLaunched is true if Shutdown has been called. Must acquire
// destroyedLock to access.
shutdownLaunched bool
// destroyedLock guards destroyed, runnersLaunched, destroyLaunched,
// shutdownLaunched, and serializes Shutdown/Destroy calls.
destroyedLock sync.Mutex
// Alloc captures the allocation being run.
@@ -130,11 +156,14 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
vaultClient: config.Vault,
tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)),
waitCh: make(chan struct{}),
destroyCh: make(chan struct{}),
shutdownCh: make(chan struct{}),
state: &state.State{},
stateDB: config.StateDB,
stateUpdater: config.StateUpdater,
taskStateUpdatedCh: make(chan struct{}, 1),
taskStateUpdateHandlerCh: make(chan struct{}),
allocUpdatedCh: make(chan *structs.Allocation, 1),
deviceStatsReporter: config.DeviceStatsReporter,
prevAllocWatcher: config.PrevAllocWatcher,
prevAllocMigrator: config.PrevAllocMigrator,
@@ -204,6 +233,9 @@ func (ar *allocRunner) Run() {
// Start the task state update handler
go ar.handleTaskStateUpdates()
// Start the alloc update handler
go ar.handleAllocUpdates()
// If an alloc should not be run, ensure any restored task handles are
// destroyed and exit to wait for the AR to be GC'd by the client.
if !ar.shouldRun() {
@@ -579,12 +611,55 @@ func (ar *allocRunner) AllocState() *state.State {
return state
}
// Update the running allocation with a new version received from the server.
//
// Update asyncronously updates the running allocation with a new version
// received from the server.
// When processing a new update, we will first attempt to drain stale updates
// from the queue, before appending the new one.
func (ar *allocRunner) Update(update *structs.Allocation) {
select {
// Drain queued update from the channel if possible, and check the modify
// index
case oldUpdate := <-ar.allocUpdatedCh:
// If the old update is newer than the replacement, then skip the new one
// and return. This case shouldn't happen, but may in the case of a bug
// elsewhere inside the system.
if oldUpdate.AllocModifyIndex > update.AllocModifyIndex {
ar.logger.Warn("Discarding allocation update due to newer alloc revision in queue",
"old_modify_index", oldUpdate.AllocModifyIndex,
"new_modify_index", update.AllocModifyIndex)
ar.allocUpdatedCh <- oldUpdate
return
} else {
ar.logger.Trace("Discarding allocation update",
"skipped_modify_index", oldUpdate.AllocModifyIndex,
"new_modify_index", update.AllocModifyIndex)
}
case <-ar.waitCh:
ar.logger.Trace("AllocRunner has terminated, skipping alloc update",
"modify_index", update.AllocModifyIndex)
return
default:
}
// Queue the new update
ar.allocUpdatedCh <- update
}
func (ar *allocRunner) handleAllocUpdates() {
for {
select {
case update := <-ar.allocUpdatedCh:
ar.handleAllocUpdate(update)
case <-ar.waitCh:
break
}
}
}
// This method sends the updated alloc to Run for serially processing updates.
// If there is already a pending update it will be discarded and replaced by
// the latest update.
func (ar *allocRunner) Update(update *structs.Allocation) {
func (ar *allocRunner) handleAllocUpdate(update *structs.Allocation) {
// Detect Stop updates
stopping := !ar.Alloc().TerminalStatus() && update.TerminalStatus()
@@ -615,19 +690,7 @@ func (ar *allocRunner) Listener() *cstructs.AllocListener {
return ar.allocBroadcaster.Listen()
}
// Destroy the alloc runner by synchronously stopping it if it is still running
// and cleaning up all of its resources.
//
// This method is safe for calling concurrently with Run() and will cause it to
// exit (thus closing WaitCh).
func (ar *allocRunner) Destroy() {
ar.destroyedLock.Lock()
defer ar.destroyedLock.Unlock()
if ar.destroyed {
// Only destroy once
return
}
func (ar *allocRunner) destroyImpl() {
// Stop any running tasks and persist states in case the client is
// shutdown before Destroy finishes.
states := ar.killTasks()
@@ -652,7 +715,53 @@ func (ar *allocRunner) Destroy() {
}
// Mark alloc as destroyed
ar.destroyedLock.Lock()
if !ar.shutdown {
ar.shutdown = true
close(ar.shutdownCh)
}
ar.destroyed = true
close(ar.destroyCh)
ar.destroyedLock.Unlock()
}
// Destroy the alloc runner by stopping it if it is still running and cleaning
// up all of its resources.
//
// This method is safe for calling concurrently with Run() and will cause it to
// exit (thus closing WaitCh).
// When the destroy action is completed, it will close DestroyCh().
func (ar *allocRunner) Destroy() {
ar.destroyedLock.Lock()
defer ar.destroyedLock.Unlock()
if ar.destroyed {
// Only destroy once
return
}
if ar.destroyLaunched {
// Only dispatch a destroy once
return
}
ar.destroyLaunched = true
// Synchronize calls to shutdown/destroy
if ar.shutdownLaunched {
go func() {
ar.logger.Debug("Waiting for shutdown before destroying runner")
<-ar.shutdownCh
ar.destroyImpl()
}()
return
}
go ar.destroyImpl()
}
// IsDestroyed returns true if the alloc runner has been destroyed (stopped and
@@ -675,8 +784,21 @@ func (ar *allocRunner) IsWaiting() bool {
return ar.prevAllocWatcher.IsWaiting()
}
// Shutdown AllocRunner gracefully. Blocks while shutting down all TaskRunners.
// DestroyCh is a channel that is closed when an allocrunner is closed due to
// an explicit call to Destroy().
func (ar *allocRunner) DestroyCh() <-chan struct{} {
return ar.destroyCh
}
// ShutdownCh is a channel that is closed when an allocrunner is closed due to
// either an explicit call to Shutdown(), or Destroy().
func (ar *allocRunner) ShutdownCh() <-chan struct{} {
return ar.shutdownCh
}
// Shutdown AllocRunner gracefully. Asynchronously shuts down all TaskRunners.
// Tasks are unaffected and may be restored.
// When the destroy action is completed, it will close ShutdownCh().
func (ar *allocRunner) Shutdown() {
ar.destroyedLock.Lock()
defer ar.destroyedLock.Unlock()
@@ -687,29 +809,45 @@ func (ar *allocRunner) Shutdown() {
return
}
ar.logger.Trace("shutting down")
// Shutdown tasks gracefully if they were run
if ar.runnersLaunched {
wg := sync.WaitGroup{}
for _, tr := range ar.tasks {
wg.Add(1)
go func(tr *taskrunner.TaskRunner) {
tr.Shutdown()
wg.Done()
}(tr)
}
wg.Wait()
// Destroy is a superset of Shutdown so if it's been marked for destruction,
// don't try and shutdown in parallel. If shutdown has been launched, don't
// try again.
if ar.destroyLaunched || ar.shutdownLaunched {
return
}
// Wait for Run to exit
<-ar.waitCh
ar.shutdownLaunched = true
// Run shutdown hooks
ar.shutdownHooks()
go func() {
ar.logger.Trace("shutting down")
// Wait for updater to finish its final run
<-ar.taskStateUpdateHandlerCh
// Shutdown tasks gracefully if they were run
if ar.runnersLaunched {
wg := sync.WaitGroup{}
for _, tr := range ar.tasks {
wg.Add(1)
go func(tr *taskrunner.TaskRunner) {
tr.Shutdown()
wg.Done()
}(tr)
}
wg.Wait()
}
// Wait for Run to exit
<-ar.waitCh
// Run shutdown hooks
ar.shutdownHooks()
// Wait for updater to finish its final run
<-ar.taskStateUpdateHandlerCh
ar.destroyedLock.Lock()
ar.shutdown = true
close(ar.shutdownCh)
ar.destroyedLock.Unlock()
}()
}
// IsMigrating returns true if the alloc runner is migrating data from its

View File

@@ -279,13 +279,64 @@ func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) {
ar2.Destroy()
select {
case <-ar2.WaitCh():
case <-ar2.DestroyCh():
// exited as expected
case <-time.After(10 * time.Second):
t.Fatalf("timed out waiting for AR to GC")
}
}
func TestAllocRunner_Update_Semantics(t *testing.T) {
t.Parallel()
require := require.New(t)
updatedAlloc := func(a *structs.Allocation) *structs.Allocation {
upd := a.CopySkipJob()
upd.AllocModifyIndex++
return upd
}
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(err)
upd1 := updatedAlloc(alloc)
ar.Update(upd1)
// Update was placed into a queue
require.Len(ar.allocUpdatedCh, 1)
upd2 := updatedAlloc(alloc)
ar.Update(upd2)
// Allocation was _replaced_
require.Len(ar.allocUpdatedCh, 1)
queuedAlloc := <-ar.allocUpdatedCh
require.Equal(upd2, queuedAlloc)
// Requeueing older alloc is skipped
ar.Update(upd2)
ar.Update(upd1)
queuedAlloc = <-ar.allocUpdatedCh
require.Equal(upd2, queuedAlloc)
// Ignore after watch closed
close(ar.waitCh)
ar.Update(upd1)
// Did not queue the update
require.Len(ar.allocUpdatedCh, 0)
}
/*
import (

View File

@@ -119,6 +119,8 @@ type AllocRunner interface {
StatsReporter() interfaces.AllocStatsReporter
Update(*structs.Allocation)
WaitCh() <-chan struct{}
DestroyCh() <-chan struct{}
ShutdownCh() <-chan struct{}
}
// Client is used to implement the client interaction with Nomad. Clients
@@ -567,22 +569,21 @@ func (c *Client) Shutdown() error {
// Stop Garbage collector
c.garbageCollector.Stop()
arGroup := group{}
if c.config.DevMode {
// In DevMode destroy all the running allocations.
for _, ar := range c.getAllocRunners() {
ar.Destroy()
}
for _, ar := range c.getAllocRunners() {
<-ar.WaitCh()
arGroup.AddCh(ar.DestroyCh())
}
} else {
// In normal mode call shutdown
arGroup := group{}
for _, ar := range c.getAllocRunners() {
arGroup.Go(ar.Shutdown)
ar.Shutdown()
arGroup.AddCh(ar.ShutdownCh())
}
arGroup.Wait()
}
arGroup.Wait()
c.shutdown = true
close(c.shutdownCh)
@@ -2682,6 +2683,12 @@ func (g *group) Go(f func()) {
}()
}
func (c *group) AddCh(ch <-chan struct{}) {
c.Go(func() {
<-ch
})
}
// Wait for all goroutines to exit. Must be called after all calls to Go
// complete.
func (g *group) Wait() {

View File

@@ -642,7 +642,7 @@ func TestClient_SaveRestoreState(t *testing.T) {
}
for _, ar := range c2.getAllocRunners() {
<-ar.WaitCh()
<-ar.DestroyCh()
}
}
@@ -780,7 +780,7 @@ func TestClient_BlockedAllocations(t *testing.T) {
}
for _, ar := range c1.getAllocRunners() {
<-ar.WaitCh()
<-ar.DestroyCh()
}
}

View File

@@ -183,7 +183,7 @@ func (a *AllocGarbageCollector) destroyAllocRunner(allocID string, ar AllocRunne
ar.Destroy()
select {
case <-ar.WaitCh():
case <-ar.DestroyCh():
case <-a.shutdownCh:
}