diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index 9d852beaa..8c1550994 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -49,6 +49,12 @@ type allocRunner struct { // before this goroutine exits. taskStateUpdateHandlerCh chan struct{} + // allocUpdatedCh is a channel that is used to stream allocation updates into + // the allocUpdate handler. Must have len==1 to allow nonblocking notification + // of new allocation updates while the goroutine is processing a previous + // update. + allocUpdatedCh chan *structs.Allocation + // consulClient is the client used by the consul service hook for // registering services and checks consulClient consul.ConsulServiceAPI @@ -157,6 +163,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { stateUpdater: config.StateUpdater, taskStateUpdatedCh: make(chan struct{}, 1), taskStateUpdateHandlerCh: make(chan struct{}), + allocUpdatedCh: make(chan *structs.Allocation, 1), deviceStatsReporter: config.DeviceStatsReporter, prevAllocWatcher: config.PrevAllocWatcher, prevAllocMigrator: config.PrevAllocMigrator, @@ -226,6 +233,9 @@ func (ar *allocRunner) Run() { // Start the task state update handler go ar.handleTaskStateUpdates() + // Start the alloc update handler + go ar.handleAllocUpdates() + // If an alloc should not be run, ensure any restored task handles are // destroyed and exit to wait for the AR to be GC'd by the client. if !ar.shouldRun() { @@ -601,12 +611,44 @@ func (ar *allocRunner) AllocState() *state.State { return state } -// Update the running allocation with a new version received from the server. -// +// Update asyncronously updates the running allocation with a new version +// received from the server. +// When processing a new update, we will first attempt to drain stale updates +// from the queue, before appending the new one. +func (ar *allocRunner) Update(update *structs.Allocation) { + select { + // Drain queued update from the channel if possible, and check the modify + // index + case oldUpdate := <-ar.allocUpdatedCh: + // If the old update is newer than the replacement, then skip the new one + // and return. This case shouldn't happen, but may in the case of a bug + // elsewhere inside the system. + if oldUpdate.AllocModifyIndex > update.AllocModifyIndex { + ar.allocUpdatedCh <- oldUpdate + return + } + default: + } + + // Queue the new update + ar.allocUpdatedCh <- update +} + +func (ar *allocRunner) handleAllocUpdates() { + for { + select { + case update := <-ar.allocUpdatedCh: + ar.handleAllocUpdate(update) + case <-ar.waitCh: + break + } + } +} + // This method sends the updated alloc to Run for serially processing updates. // If there is already a pending update it will be discarded and replaced by // the latest update. -func (ar *allocRunner) Update(update *structs.Allocation) { +func (ar *allocRunner) handleAllocUpdate(update *structs.Allocation) { // Detect Stop updates stopping := !ar.Alloc().TerminalStatus() && update.TerminalStatus() diff --git a/client/allocrunner/alloc_runner_test.go b/client/allocrunner/alloc_runner_test.go index 55fa0ea17..eeedef59f 100644 --- a/client/allocrunner/alloc_runner_test.go +++ b/client/allocrunner/alloc_runner_test.go @@ -286,6 +286,48 @@ func TestAllocRunner_TaskLeader_StopRestoredTG(t *testing.T) { } } +func TestAllocRunner_Update_Semantics(t *testing.T) { + t.Parallel() + require := require.New(t) + + updatedAlloc := func(a *structs.Allocation) *structs.Allocation { + upd := a.CopySkipJob() + upd.AllocModifyIndex++ + + return upd + } + + alloc := mock.Alloc() + alloc.Job.TaskGroups[0].Tasks[0].Driver = "mock_driver" + conf, cleanup := testAllocRunnerConfig(t, alloc) + defer cleanup() + + ar, err := NewAllocRunner(conf) + require.NoError(err) + + upd1 := updatedAlloc(alloc) + ar.Update(upd1) + + // Update was placed into a queue + require.Len(ar.allocUpdatedCh, 1) + + upd2 := updatedAlloc(alloc) + ar.Update(upd2) + + // Allocation was _replaced_ + + require.Len(ar.allocUpdatedCh, 1) + queuedAlloc := <-ar.allocUpdatedCh + require.Equal(upd2, queuedAlloc) + + // Requeueing older alloc is skipped + ar.Update(upd2) + ar.Update(upd1) + + queuedAlloc = <-ar.allocUpdatedCh + require.Equal(upd2, queuedAlloc) +} + /* import (