From d200a835c9d467ebfe2be321fccbc5823c6f7992 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 5 Oct 2016 15:11:09 -0700 Subject: [PATCH] Fix handling of restart in TaskEvents --- api/tasks.go | 1 + client/restarts.go | 46 ++++++++++++++++++++-------- client/restarts_test.go | 10 +++++++ client/task_runner.go | 65 ++++++++++++++++++++++++---------------- command/alloc_status.go | 12 ++++---- nomad/structs/structs.go | 13 ++++---- 6 files changed, 100 insertions(+), 47 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index dae9164dc..8466c8cb3 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -249,6 +249,7 @@ const ( TaskVaultRenewalFailed = "Vault token renewal failed" TaskSiblingFailed = "Sibling task failed" TaskSignaling = "Signaling" + TaskRestartSignal = "Restart Signaled" ) // TaskEvent is an event that effects the state of a task and contains meta-data diff --git a/client/restarts.go b/client/restarts.go index 3fbffc4e0..1c6a6b843 100644 --- a/client/restarts.go +++ b/client/restarts.go @@ -34,15 +34,16 @@ func newRestartTracker(policy *structs.RestartPolicy, jobType string) *RestartTr } type RestartTracker struct { - waitRes *cstructs.WaitResult - startErr error - count int // Current number of attempts. - onSuccess bool // Whether to restart on successful exit code. - startTime time.Time // When the interval began - reason string // The reason for the last state - policy *structs.RestartPolicy - rand *rand.Rand - lock sync.Mutex + waitRes *cstructs.WaitResult + startErr error + restartTriggered bool // Whether the task has been signalled to be restarted + count int // Current number of attempts. + onSuccess bool // Whether to restart on successful exit code. + startTime time.Time // When the interval began + reason string // The reason for the last state + policy *structs.RestartPolicy + rand *rand.Rand + lock sync.Mutex } // SetPolicy updates the policy used to determine restarts. @@ -69,6 +70,15 @@ func (r *RestartTracker) SetWaitResult(res *cstructs.WaitResult) *RestartTracker return r } +// SetRestartTriggered is used to mark that the task has been signalled to be +// restarted +func (r *RestartTracker) SetRestartTriggered() *RestartTracker { + r.lock.Lock() + defer r.lock.Unlock() + r.restartTriggered = true + return r +} + // GetReason returns a human-readable description for the last state returned by // GetState. func (r *RestartTracker) GetReason() string { @@ -111,13 +121,25 @@ func (r *RestartTracker) GetState() (string, time.Duration) { r.startTime = now } + var state string + var dur time.Duration if r.startErr != nil { - return r.handleStartError() + state, dur = r.handleStartError() } else if r.waitRes != nil { - return r.handleWaitResult() + state, dur = r.handleWaitResult() + } else if r.restartTriggered { + state, dur = structs.TaskRestarting, 0 + r.reason = "" } else { - return "", 0 + state, dur = "", 0 } + + // Clear out the existing state + r.startErr = nil + r.waitRes = nil + r.restartTriggered = false + + return state, dur } // handleStartError returns the new state and potential wait duration for diff --git a/client/restarts_test.go b/client/restarts_test.go index 0a15e2285..86960b1f7 100644 --- a/client/restarts_test.go +++ b/client/restarts_test.go @@ -94,6 +94,16 @@ func TestClient_RestartTracker_ZeroAttempts(t *testing.T) { } } +func TestClient_RestartTracker_RestartTriggered(t *testing.T) { + t.Parallel() + p := testPolicy(true, structs.RestartPolicyModeFail) + p.Attempts = 0 + rt := newRestartTracker(p, structs.JobTypeService) + if state, when := rt.SetRestartTriggered().GetState(); state != structs.TaskRestarting && when != 0 { + t.Fatalf("expect restart immediately, got %v %v", state, when) + } +} + func TestClient_RestartTracker_StartError_Recoverable_Fail(t *testing.T) { t.Parallel() p := testPolicy(true, structs.RestartPolicyModeFail) diff --git a/client/task_runner.go b/client/task_runner.go index 251c0e618..9221e54a5 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -516,13 +516,18 @@ func (r *TaskRunner) run() { case event := <-r.restartCh: r.logger.Printf("[DEBUG] client: task being restarted: %s", event.RestartReason) r.setState(structs.TaskStateRunning, event) + r.killTask(event.RestartReason, stopCollection) - // TODO do a kill and then goto restart + // Since the restart isn't from a failure, restart immediately + // and don't count against the restart policy + r.restartTracker.SetRestartTriggered() + break WAIT case event := <-r.killCh: r.logger.Printf("[ERR] client: task being killed: %s", event.KillReason) r.setState(structs.TaskStateRunning, event) - // TODO + r.killTask(event.KillReason, stopCollection) + return case <-r.destroyCh: // Store the task event that provides context on the task destroy. @@ -530,28 +535,7 @@ func (r *TaskRunner) run() { r.setState(structs.TaskStateRunning, r.destroyEvent) } - // Mark that we received the kill event - timeout := driver.GetKillTimeout(r.task.KillTimeout, r.config.MaxKillTimeout) - r.setState(structs.TaskStateRunning, - structs.NewTaskEvent(structs.TaskKilling).SetKillTimeout(timeout)) - - // Kill the task using an exponential backoff in-case of failures. - destroySuccess, err := r.handleDestroy() - if !destroySuccess { - // We couldn't successfully destroy the resource created. - r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err) - } - - // Stop collection of the task's resource usage - close(stopCollection) - - // Store that the task has been destroyed and any associated error. - r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(err)) - - r.runningLock.Lock() - r.running = false - r.runningLock.Unlock() - + r.killTask("", stopCollection) return } } @@ -613,6 +597,37 @@ func (r *TaskRunner) shouldRestart() bool { return true } +func (r *TaskRunner) killTask(reason string, statsCh chan struct{}) { + r.runningLock.Lock() + running := r.running + r.runningLock.Unlock() + if !running { + return + } + + // Mark that we received the kill event + timeout := driver.GetKillTimeout(r.task.KillTimeout, r.config.MaxKillTimeout) + r.setState(structs.TaskStateRunning, + structs.NewTaskEvent(structs.TaskKilling).SetKillTimeout(timeout).SetKillReason(reason)) + + // Kill the task using an exponential backoff in-case of failures. + destroySuccess, err := r.handleDestroy() + if !destroySuccess { + // We couldn't successfully destroy the resource created. + r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err) + } + + // Stop collection of the task's resource usage + close(statsCh) + + // Store that the task has been destroyed and any associated error. + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(err)) + + r.runningLock.Lock() + r.running = false + r.runningLock.Unlock() +} + // startTask creates the driver and starts the task. func (r *TaskRunner) startTask() error { // Create a driver @@ -761,7 +776,7 @@ func (r *TaskRunner) handleDestroy() (destroyed bool, err error) { func (r *TaskRunner) Restart(source, reason string) { reasonStr := fmt.Sprintf("%s: %s", source, reason) - event := structs.NewTaskEvent(structs.TaskRestarting).SetRestartReason(reasonStr) + event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reasonStr) r.logger.Printf("[DEBUG] client: restarting task %v for alloc %q: %v", r.task.Name, r.alloc.ID, reasonStr) diff --git a/command/alloc_status.go b/command/alloc_status.go index 758e8ee0c..4e2de1b9f 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -325,11 +325,7 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) { case api.TaskRestarting: in := fmt.Sprintf("Task restarting in %v", time.Duration(event.StartDelay)) if event.RestartReason != "" && event.RestartReason != client.ReasonWithinPolicy { - if event.StartDelay == 0 { - desc = event.RestartReason - } else { - desc = fmt.Sprintf("%s - %s", event.RestartReason, in) - } + desc = fmt.Sprintf("%s - %s", event.RestartReason, in) } else { desc = in } @@ -370,6 +366,12 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) { } else { desc = fmt.Sprintf("Task being sent signal %v: %v", sig, reason) } + case api.TaskRestartSignal: + if event.RestartReason != "" { + desc = event.RestartReason + } else { + desc = "Task signaled to restart" + } } // Reverse order so we are sorted by time diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index e84605ca5..fff8b3e82 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -2387,17 +2387,20 @@ const ( // TaskKilled indicates a user has killed the task. TaskKilled = "Killed" - // TaskRestarting indicates that task terminated and is being restarted or - // that it is being forced to be restarted. + // TaskRestarting indicates that task terminated and is being restarted. TaskRestarting = "Restarting" - // TaskSignaling indicates that the task is being signalled. - TaskSignaling = "Signaling" - // TaskNotRestarting indicates that the task has failed and is not being // restarted because it has exceeded its restart policy. TaskNotRestarting = "Not Restarting" + // TaskRestartSignal indicates that the task has been signalled to be + // restarted + TaskRestartSignal = "Restart Signaled" + + // TaskSignaling indicates that the task is being signalled. + TaskSignaling = "Signaling" + // TaskDownloadingArtifacts means the task is downloading the artifacts // specified in the task. TaskDownloadingArtifacts = "Downloading Artifacts"