allocrunner: Handle updates asynchronously

This creates a new buffered channel and goroutine on the allocrunner for
serializing updates to allocations. This allows us to take updates off
the routine that is used from processing updates from the server,
without having complicated machinery for tracking update lifetimes, or
other external synchronization.

This results in a nice performance improvement and signficantly better
throughput on batch changes such as preempting a large number of jobs
for a larger placement.
This commit is contained in:
Danielle Tomlinson
2018-12-17 13:27:54 +01:00
parent 6f636ea15a
commit 69fc73767a
2 changed files with 87 additions and 3 deletions

View File

@@ -49,6 +49,12 @@ type allocRunner struct {
// before this goroutine exits.
taskStateUpdateHandlerCh chan struct{}
// allocUpdatedCh is a channel that is used to stream allocation updates into
// the allocUpdate handler. Must have len==1 to allow nonblocking notification
// of new allocation updates while the goroutine is processing a previous
// update.
allocUpdatedCh chan *structs.Allocation
// consulClient is the client used by the consul service hook for
// registering services and checks
consulClient consul.ConsulServiceAPI
@@ -157,6 +163,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) {
stateUpdater: config.StateUpdater,
taskStateUpdatedCh: make(chan struct{}, 1),
taskStateUpdateHandlerCh: make(chan struct{}),
allocUpdatedCh: make(chan *structs.Allocation, 1),
deviceStatsReporter: config.DeviceStatsReporter,
prevAllocWatcher: config.PrevAllocWatcher,
prevAllocMigrator: config.PrevAllocMigrator,
@@ -226,6 +233,9 @@ func (ar *allocRunner) Run() {
// Start the task state update handler
go ar.handleTaskStateUpdates()
// Start the alloc update handler
go ar.handleAllocUpdates()
// If an alloc should not be run, ensure any restored task handles are
// destroyed and exit to wait for the AR to be GC'd by the client.
if !ar.shouldRun() {
@@ -601,12 +611,44 @@ func (ar *allocRunner) AllocState() *state.State {
return state
}
// Update the running allocation with a new version received from the server.
//
// Update asyncronously updates the running allocation with a new version
// received from the server.
// When processing a new update, we will first attempt to drain stale updates
// from the queue, before appending the new one.
func (ar *allocRunner) Update(update *structs.Allocation) {
select {
// Drain queued update from the channel if possible, and check the modify
// index
case oldUpdate := <-ar.allocUpdatedCh:
// If the old update is newer than the replacement, then skip the new one
// and return. This case shouldn't happen, but may in the case of a bug
// elsewhere inside the system.
if oldUpdate.AllocModifyIndex > update.AllocModifyIndex {
ar.allocUpdatedCh <- oldUpdate
return
}
default:
}
// Queue the new update
ar.allocUpdatedCh <- update
}
func (ar *allocRunner) handleAllocUpdates() {
for {
select {
case update := <-ar.allocUpdatedCh:
ar.handleAllocUpdate(update)
case <-ar.waitCh:
break
}
}
}
// This method sends the updated alloc to Run for serially processing updates.
// If there is already a pending update it will be discarded and replaced by
// the latest update.
func (ar *allocRunner) Update(update *structs.Allocation) {
func (ar *allocRunner) handleAllocUpdate(update *structs.Allocation) {
// Detect Stop updates
stopping := !ar.Alloc().TerminalStatus() && update.TerminalStatus()

View File

@@ -286,6 +286,48 @@ func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) {
}
}
func TestAllocRunner_Update_Semantics(t *testing.T) {
t.Parallel()
require := require.New(t)
updatedAlloc := func(a *structs.Allocation) *structs.Allocation {
upd := a.CopySkipJob()
upd.AllocModifyIndex++
return upd
}
alloc := mock.Alloc()
alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver"
conf, cleanup := testAllocRunnerConfig(t, alloc)
defer cleanup()
ar, err := NewAllocRunner(conf)
require.NoError(err)
upd1 := updatedAlloc(alloc)
ar.Update(upd1)
// Update was placed into a queue
require.Len(ar.allocUpdatedCh, 1)
upd2 := updatedAlloc(alloc)
ar.Update(upd2)
// Allocation was _replaced_
require.Len(ar.allocUpdatedCh, 1)
queuedAlloc := <-ar.allocUpdatedCh
require.Equal(upd2, queuedAlloc)
// Requeueing older alloc is skipped
ar.Update(upd2)
ar.Update(upd1)
queuedAlloc = <-ar.allocUpdatedCh
require.Equal(upd2, queuedAlloc)
}
/*
import (