diff --git a/client/allocrunner/alloc_runner.go b/client/allocrunner/alloc_runner.go index c64ee3da6..eb13afa2e 100644 --- a/client/allocrunner/alloc_runner.go +++ b/client/allocrunner/alloc_runner.go @@ -42,7 +42,7 @@ type allocRunner struct { // the goroutine is already processing a previous update. taskStateUpdatedCh chan struct{} - // taskStateUpdateHandlerCh is closed when thte task state handling + // taskStateUpdateHandlerCh is closed when the task state handling // goroutine exits. It is unsafe to destroy the local allocation state // before this goroutine exits. taskStateUpdateHandlerCh chan struct{} @@ -252,7 +252,7 @@ func (ar *allocRunner) Restore() error { // // The goroutine is used to compute changes to the alloc's ClientStatus and to // update the server with the new state. -func (ar *allocRunner) TaskStateUpdated(taskName string, state *structs.TaskState) { +func (ar *allocRunner) TaskStateUpdated() { select { case ar.taskStateUpdatedCh <- struct{}{}: default: @@ -269,10 +269,6 @@ func (ar *allocRunner) TaskStateUpdated(taskName string, state *structs.TaskStat func (ar *allocRunner) handleTaskStateUpdates() { defer close(ar.taskStateUpdateHandlerCh) - //TODO allocdirs are being left in working dir not temp! - //TODO remove taskname and state from TaskStateUpdated - //TODO Ensure Client updates don't callback into AR - for done := false; !done; { select { case <-ar.taskStateUpdatedCh: @@ -335,22 +331,7 @@ func (ar *allocRunner) handleTaskStateUpdates() { ar.logger.Debug("task failure, destroying all tasks", "failed_task", killTask) } - // Kill live task runners - var leader *taskrunner.TaskRunner - for _, tr := range liveRunners { - if tr.IsLeader() { - // Capture the leader to kill last - leader = tr - continue - } - - tr.Kill(context.Background(), killEvent) - } - - // Kill leader last if it exists - if leader != nil { - leader.Kill(context.Background(), killEvent) - } + ar.killTasks() } // Get the client allocation @@ -364,6 +345,41 @@ func (ar *allocRunner) handleTaskStateUpdates() { } } +// killTasks kills all task runners, leader (if there is one) first. Errors are +// logged except taskrunner.ErrTaskNotRunning which is ignored. +func (ar *allocRunner) killTasks() { + // Kill leader first, synchronously + for name, tr := range ar.tasks { + if !tr.IsLeader() { + continue + } + + err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) + if err != nil { + ar.logger.Warn("error stopping leader task", "error", err, "task_name", name) + } + break + } + + // Kill the rest concurrently + wg := sync.WaitGroup{} + for name, tr := range ar.tasks { + if tr.IsLeader() { + continue + } + + wg.Add(1) + go func(name string, tr *taskrunner.TaskRunner) { + defer wg.Done() + err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) + if err != nil && err != taskrunner.ErrTaskNotRunning { + ar.logger.Warn("error stopping task", "error", err, "task_name", name) + } + }(name, tr) + } + wg.Wait() +} + // clientAlloc takes in the task states and returns an Allocation populated // with Client specific fields func (ar *allocRunner) clientAlloc(taskStates map[string]*structs.TaskState) *structs.Allocation { @@ -486,43 +502,24 @@ func (ar *allocRunner) Update(update *structs.Allocation) { // Update ar.alloc ar.setAlloc(update) - // Run hooks - if err := ar.update(update); err != nil { - ar.logger.Error("error running update hooks", "error", err) - } - - // If alloc is being terminated, kill all tasks, leader first - if stopping { - // Kill leader first - for name, tr := range ar.tasks { - if !tr.IsLeader() { - continue - } - - err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) - if err != nil { - ar.logger.Warn("error stopping leader task", "error", err, "task_name", name) - } - break + // Run update hooks if not stopping or dead + if !update.TerminalStatus() { + if err := ar.update(update); err != nil { + ar.logger.Error("error running update hooks", "error", err) } - // Kill the rest - for name, tr := range ar.tasks { - if tr.IsLeader() { - continue - } - - err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) - if err != nil { - ar.logger.Warn("error stopping task", "error", err, "task_name", name) - } - } } // Update task runners for _, tr := range ar.tasks { tr.Update(update) } + + // If alloc is being terminated, kill all tasks, leader first + if stopping { + ar.killTasks() + } + } func (ar *allocRunner) Listener() *cstructs.AllocListener { @@ -543,13 +540,8 @@ func (ar *allocRunner) Destroy() { } defer ar.destroyedLock.Unlock() - // Stop tasks - for name, tr := range ar.tasks { - err := tr.Kill(context.TODO(), structs.NewTaskEvent(structs.TaskKilled)) - if err != nil && err != taskrunner.ErrTaskNotRunning { - ar.logger.Warn("failed to kill task", "error", err, "task_name", name) - } - } + // Stop any running tasks + ar.killTasks() // Wait for tasks to exit and postrun hooks to finish <-ar.waitCh diff --git a/client/allocrunner/interfaces/runner.go b/client/allocrunner/interfaces/runner.go index e488d932a..4789e4f94 100644 --- a/client/allocrunner/interfaces/runner.go +++ b/client/allocrunner/interfaces/runner.go @@ -2,8 +2,6 @@ package interfaces import ( "github.com/hashicorp/nomad/client/allocrunner/state" - "github.com/hashicorp/nomad/nomad/structs" - cstructs "github.com/hashicorp/nomad/client/structs" ) @@ -24,8 +22,9 @@ type AllocRunner interface { // TaskStateHandler exposes a handler to be called when a task's state changes type TaskStateHandler interface { - // TaskStateUpdated is used to emit updated task state - TaskStateUpdated(task string, state *structs.TaskState) + // TaskStateUpdated is used to notify the alloc runner about task state + // changes. + TaskStateUpdated() } // AllocStatsReporter gives acess to the latest resource usage from the diff --git a/client/allocrunner/taskrunner/task_runner.go b/client/allocrunner/taskrunner/task_runner.go index f683544ba..d0448dcf2 100644 --- a/client/allocrunner/taskrunner/task_runner.go +++ b/client/allocrunner/taskrunner/task_runner.go @@ -610,26 +610,21 @@ func (tr *TaskRunner) Restore() error { // UpdateState sets the task runners allocation state and triggers a server // update. func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) { - tr.logger.Debug("setting task state", "state", state, "event", event.Type) + tr.logger.Trace("setting task state", "state", state, "event", event.Type) // Update the local state - stateCopy := tr.setStateLocal(state, event) + tr.setStateLocal(state, event) // Notify the alloc runner of the transition - tr.stateUpdater.TaskStateUpdated(tr.taskName, stateCopy) + tr.stateUpdater.TaskStateUpdated() } // setStateLocal updates the local in-memory state, persists a copy to disk and returns a // copy of the task's state. -func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *structs.TaskState { +func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) { tr.stateLock.Lock() defer tr.stateLock.Unlock() - //XXX REMOVE ME AFTER TESTING - if state == "" { - panic("UpdateState must not be called with an empty state") - } - // Update the task state oldState := tr.state.State taskState := tr.state @@ -681,8 +676,6 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *str // try to persist it again. tr.logger.Error("error persisting task state", "error", err, "event", event, "state", state) } - - return tr.state.Copy() } // EmitEvent appends a new TaskEvent to this task's TaskState. The actual @@ -692,6 +685,7 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *str // logged. Use AppendEvent to simply add a new event. func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) { tr.stateLock.Lock() + defer tr.stateLock.Unlock() tr.appendEvent(event) @@ -701,11 +695,8 @@ func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) { tr.logger.Warn("error persisting event", "error", err, "event", event) } - stateCopy := tr.state.Copy() - tr.stateLock.Unlock() - // Notify the alloc runner of the event - tr.stateUpdater.TaskStateUpdated(tr.taskName, stateCopy) + tr.stateUpdater.TaskStateUpdated() } // AppendEvent appends a new TaskEvent to this task's TaskState. The actual @@ -769,8 +760,10 @@ func (tr *TaskRunner) Update(update *structs.Allocation) { // Update tr.alloc tr.setAlloc(update) - // Trigger update hooks - tr.triggerUpdateHooks() + // Trigger update hooks if not terminal + if !update.TerminalStatus() { + tr.triggerUpdateHooks() + } } // triggerUpdate if there isn't already an update pending. Should be called