diff --git a/client/allocrunnerv2/alloc_runner.go b/client/allocrunnerv2/alloc_runner.go index a9cfbea1f..4d48a29d3 100644 --- a/client/allocrunnerv2/alloc_runner.go +++ b/client/allocrunnerv2/alloc_runner.go @@ -24,6 +24,14 @@ import ( "github.com/hashicorp/nomad/nomad/structs" ) +const ( + // updateChCap is the capacity of AllocRunner's updateCh. It must be 1 + // as we only want to process the latest update, so if there's already + // a pending update it will be removed from the chan before adding the + // newer update. + updateChCap = 1 +) + // allocRunner is used to run all the tasks in a given allocation type allocRunner struct { // id is the ID of the allocation. Can be accessed without a lock @@ -70,7 +78,9 @@ type allocRunner struct { tasks map[string]*taskrunner.TaskRunner tasksLock sync.RWMutex - // updateCh receives allocation updates via the Update method + // updateCh receives allocation updates via the Update method. Must + // have buffer size 1 in order to support dropping pending updates when + // a newer allocation is received. updateCh chan *structs.Allocation } @@ -90,7 +100,7 @@ func NewAllocRunner(config *Config) (*allocRunner, error) { vaultClient: config.Vault, tasks: make(map[string]*taskrunner.TaskRunner, len(tg.Tasks)), waitCh: make(chan struct{}), - updateCh: make(chan *structs.Allocation), + updateCh: make(chan *structs.Allocation, updateChCap), state: &state.State{}, stateDB: config.StateDB, stateUpdater: config.StateUpdater, @@ -169,9 +179,12 @@ MAIN: // TaskRunners have all exited break MAIN case updated := <-ar.updateCh: - // Updated alloc received - //XXX Update hooks - //XXX Update ar.alloc + // Update ar.alloc + ar.setAlloc(updated) + + //TODO Run AR Update hooks + + // Update task runners for _, tr := range ar.tasks { tr.Update(updated) } @@ -206,13 +219,18 @@ func (ar *allocRunner) runImpl() <-chan struct{} { } // Alloc returns the current allocation being run by this runner. -//XXX how do we handle mutate the state saving stuff func (ar *allocRunner) Alloc() *structs.Allocation { ar.allocLock.RLock() defer ar.allocLock.RUnlock() return ar.alloc } +func (ar *allocRunner) setAlloc(updated *structs.Allocation) { + ar.allocLock.Lock() + ar.alloc = updated + ar.allocLock.Unlock() +} + // SaveState does all the state related stuff. Who knows. FIXME //XXX do we need to do periodic syncing? if Saving is only called *before* Run // *and* within Run -- *and* Updates are applid within Run -- we may be able to @@ -389,10 +407,19 @@ func getClientStatus(taskStates map[string]*structs.TaskState) (status, descript // Update the running allocation with a new version received from the server. // -// This method is safe for calling concurrently with Run() and does not modify -// the passed in allocation. +// This method sends the updated alloc to Run for serially processing updates. +// If there is already a pending update it will be discarded and replaced by +// the latest update. func (ar *allocRunner) Update(update *structs.Allocation) { - ar.updateCh <- update + select { + case ar.updateCh <- update: + // Updated alloc sent + case <-ar.updateCh: + // There was a pending update; replace it with the new update. + // This also prevents Update() from blocking if its called + // concurrently with Run() exiting. + ar.updateCh <- update + } } // Destroy the alloc runner by stopping it if it is still running and cleaning diff --git a/client/allocrunnerv2/taskrunner/task_runner.go b/client/allocrunnerv2/taskrunner/task_runner.go index d9705711f..d73c46524 100644 --- a/client/allocrunnerv2/taskrunner/task_runner.go +++ b/client/allocrunnerv2/taskrunner/task_runner.go @@ -38,6 +38,12 @@ const ( // killFailureLimit is how many times we will attempt to kill a task before // giving up and potentially leaking resources. killFailureLimit = 5 + + // triggerUpdatechCap is the capacity for the triggerUpdateCh used for + // triggering updates. It should be exactly 1 as even if multiple + // updates have come in since the last one was handled, we only need to + // handle the last one. + triggerUpdateChCap = 1 ) var ( @@ -91,8 +97,10 @@ type TaskRunner struct { // Logger is the logger for the task runner. logger log.Logger - // updateCh receives Alloc updates - updateCh chan *structs.Allocation + // triggerUpdateCh is ticked whenever update hooks need to be run and + // must be created with cap=1 to signal a pending update and prevent + // callers from deadlocking if the receiver has exited. + triggerUpdateCh chan struct{} // waitCh is closed when the task runner has transitioned to a terminal // state @@ -182,14 +190,14 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { consulClient: config.Consul, vaultClient: config.VaultClient, //XXX Make a Copy to avoid races? - state: config.Alloc.TaskStates[config.Task.Name], - localState: config.LocalState, - stateDB: config.StateDB, - stateUpdater: config.StateUpdater, - ctx: trCtx, - ctxCancel: trCancel, - updateCh: make(chan *structs.Allocation), - waitCh: make(chan struct{}), + state: config.Alloc.TaskStates[config.Task.Name], + localState: config.LocalState, + stateDB: config.StateDB, + stateUpdater: config.StateUpdater, + ctx: trCtx, + ctxCancel: trCancel, + triggerUpdateCh: make(chan struct{}, triggerUpdateChCap), + waitCh: make(chan struct{}), } // Create the logger based on the allocation ID @@ -258,6 +266,11 @@ func (tr *TaskRunner) Run() { defer close(tr.waitCh) var handle driver.DriverHandle + // Updates are handled asynchronously with the other hooks but each + // triggered update - whether due to alloc updates or a new vault token + // - should be handled serially. + go tr.handleUpdates() + MAIN: for tr.ctx.Err() == nil { // Run the prestart hooks @@ -329,6 +342,20 @@ MAIN: tr.logger.Debug("task run loop exiting") } +// handleUpdates runs update hooks when triggerUpdateCh is ticked and exits +// when Run has returned. Should only be run in a goroutine from Run. +func (tr *TaskRunner) handleUpdates() { + for { + select { + case <-tr.triggerUpdateCh: + // Update triggered; run hooks + tr.updateHooks() + case <-tr.waitCh: + return + } + } +} + func (tr *TaskRunner) shouldRestart() (bool, time.Duration) { // Determine if we should restart state, when := tr.restartTracker.GetState() @@ -665,16 +692,28 @@ func (tr *TaskRunner) WaitCh() <-chan struct{} { } // Update the running allocation with a new version received from the server. +// Calls Update hooks asynchronously with Run(). // // This method is safe for calling concurrently with Run() and does not modify // the passed in allocation. func (tr *TaskRunner) Update(update *structs.Allocation) { + // Update tr.alloc + tr.setAlloc(update) + + // Trigger update hooks + tr.triggerUpdateHooks() +} + +// triggerUpdate if there isn't already an update pending. Should be called +// instead of calling updateHooks directly to serialize runs of update hooks. +// TaskRunner state should be updated prior to triggering update hooks. +// +// Does not block. +func (tr *TaskRunner) triggerUpdateHooks() { select { - case tr.updateCh <- update: - case <-tr.WaitCh(): - //XXX Do we log here like we used to? If we're just - //shutting down it's not an error to drop the update as - //it will be applied on startup + case tr.triggerUpdateCh <- struct{}{}: + default: + // already an update hook pending } } diff --git a/client/allocrunnerv2/taskrunner/task_runner_getters.go b/client/allocrunnerv2/taskrunner/task_runner_getters.go index 013f55849..8a40c65bc 100644 --- a/client/allocrunnerv2/taskrunner/task_runner_getters.go +++ b/client/allocrunnerv2/taskrunner/task_runner_getters.go @@ -11,6 +11,12 @@ func (tr *TaskRunner) Alloc() *structs.Allocation { return tr.alloc } +func (tr *TaskRunner) setAlloc(updated *structs.Allocation) { + tr.allocLock.Lock() + tr.alloc = updated + tr.allocLock.Unlock() +} + func (tr *TaskRunner) Task() *structs.Task { tr.taskLock.RLock() defer tr.taskLock.RUnlock() @@ -29,10 +35,18 @@ func (tr *TaskRunner) getVaultToken() string { return tr.vaultToken } +// setVaultToken updates the vault token on the task runner as well as in the +// task's environment. These two places must be set atomically to avoid a task +// seeing a different token on the task runner and in its environment. func (tr *TaskRunner) setVaultToken(token string) { tr.vaultTokenLock.Lock() defer tr.vaultTokenLock.Unlock() + + // Update the Vault token on the runner tr.vaultToken = token + + // Update the task's environment + tr.envBuilder.SetVaultToken(token, tr.task.Vault.Env) } func (tr *TaskRunner) getDriverHandle() driver.DriverHandle { diff --git a/client/allocrunnerv2/taskrunner/task_runner_hooks.go b/client/allocrunnerv2/taskrunner/task_runner_hooks.go index ac44c1451..4ff6bb0a4 100644 --- a/client/allocrunnerv2/taskrunner/task_runner_hooks.go +++ b/client/allocrunnerv2/taskrunner/task_runner_hooks.go @@ -288,7 +288,9 @@ func (tr *TaskRunner) stop() error { return merr.ErrorOrNil() } -// update is used to run the runners update hooks. +// update is used to run the runners update hooks. Should only be called from +// Run(). To trigger an update, update state on the TaskRunner and call +// triggerUpdateHooks. func (tr *TaskRunner) updateHooks() { if tr.logger.IsTrace() { start := time.Now() diff --git a/client/allocrunnerv2/taskrunner/vault_hook.go b/client/allocrunnerv2/taskrunner/vault_hook.go index 7b52e8b10..479479e71 100644 --- a/client/allocrunnerv2/taskrunner/vault_hook.go +++ b/client/allocrunnerv2/taskrunner/vault_hook.go @@ -38,14 +38,11 @@ type vaultTokenUpdateHandler interface { } func (tr *TaskRunner) updatedVaultToken(token string) { - // Update the Vault token on the runner + // Update the task runner and environment tr.setVaultToken(token) - // Update the tasks environment - tr.envBuilder.SetVaultToken(token, tr.task.Vault.Env) - - // Update the hooks with the new Vault token - tr.updateHooks() + // Trigger update hooks with the new Vault token + tr.triggerUpdateHooks() } type vaultHookConfig struct { diff --git a/client/client.go b/client/client.go index 7d1d7537e..82e734d14 100644 --- a/client/client.go +++ b/client/client.go @@ -1845,10 +1845,8 @@ func (c *Client) runAllocs(update *allocUpdates) { // Update the existing allocations for _, update := range diff.updated { - if err := c.updateAlloc(update); err != nil { - c.logger.Printf("[ERR] client: failed to update alloc %q: %v", - update.ID, err) - } + c.logger.Printf("[TRACE] client: updating alloc %q to index %d", update.ID, update.AllocModifyIndex) + c.updateAlloc(update) } // Make room for new allocations before running @@ -1893,17 +1891,16 @@ func (c *Client) removeAlloc(allocID string) { } // updateAlloc is invoked when we should update an allocation -func (c *Client) updateAlloc(update *structs.Allocation) error { +func (c *Client) updateAlloc(update *structs.Allocation) { c.allocLock.Lock() defer c.allocLock.Unlock() ar, ok := c.allocs[update.ID] if !ok { c.logger.Printf("[WARN] client: missing context for alloc %q", update.ID) - return nil + return } ar.Update(update) - return nil } // addAlloc is invoked when we should add an allocation