Fix handling of restart in TaskEvents

This commit is contained in:
Alex Dadgar
2016-10-05 15:11:09 -07:00
parent b976633e4d
commit d200a835c9
6 changed files with 100 additions and 47 deletions

View File

@@ -249,6 +249,7 @@ const (
TaskVaultRenewalFailed = "Vault token renewal failed"
TaskSiblingFailed = "Sibling task failed"
TaskSignaling = "Signaling"
TaskRestartSignal = "Restart Signaled"
)
// TaskEvent is an event that effects the state of a task and contains meta-data

View File

@@ -34,15 +34,16 @@ func newRestartTracker(policy *structs.RestartPolicy, jobType string) *RestartTr
}
type RestartTracker struct {
waitRes *cstructs.WaitResult
startErr error
count int // Current number of attempts.
onSuccess bool // Whether to restart on successful exit code.
startTime time.Time // When the interval began
reason string // The reason for the last state
policy *structs.RestartPolicy
rand *rand.Rand
lock sync.Mutex
waitRes *cstructs.WaitResult
startErr error
restartTriggered bool // Whether the task has been signalled to be restarted
count int // Current number of attempts.
onSuccess bool // Whether to restart on successful exit code.
startTime time.Time // When the interval began
reason string // The reason for the last state
policy *structs.RestartPolicy
rand *rand.Rand
lock sync.Mutex
}
// SetPolicy updates the policy used to determine restarts.
@@ -69,6 +70,15 @@ func (r *RestartTracker) SetWaitResult(res *cstructs.WaitResult) *RestartTracker
return r
}
// SetRestartTriggered is used to mark that the task has been signalled to be
// restarted
func (r *RestartTracker) SetRestartTriggered() *RestartTracker {
r.lock.Lock()
defer r.lock.Unlock()
r.restartTriggered = true
return r
}
// GetReason returns a human-readable description for the last state returned by
// GetState.
func (r *RestartTracker) GetReason() string {
@@ -111,13 +121,25 @@ func (r *RestartTracker) GetState() (string, time.Duration) {
r.startTime = now
}
var state string
var dur time.Duration
if r.startErr != nil {
return r.handleStartError()
state, dur = r.handleStartError()
} else if r.waitRes != nil {
return r.handleWaitResult()
state, dur = r.handleWaitResult()
} else if r.restartTriggered {
state, dur = structs.TaskRestarting, 0
r.reason = ""
} else {
return "", 0
state, dur = "", 0
}
// Clear out the existing state
r.startErr = nil
r.waitRes = nil
r.restartTriggered = false
return state, dur
}
// handleStartError returns the new state and potential wait duration for

View File

@@ -94,6 +94,16 @@ func TestClient_RestartTracker_ZeroAttempts(t *testing.T) {
}
}
func TestClient_RestartTracker_RestartTriggered(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeFail)
p.Attempts = 0
rt := newRestartTracker(p, structs.JobTypeService)
if state, when := rt.SetRestartTriggered().GetState(); state != structs.TaskRestarting && when != 0 {
t.Fatalf("expect restart immediately, got %v %v", state, when)
}
}
func TestClient_RestartTracker_StartError_Recoverable_Fail(t *testing.T) {
t.Parallel()
p := testPolicy(true, structs.RestartPolicyModeFail)

View File

@@ -516,13 +516,18 @@ func (r *TaskRunner) run() {
case event := <-r.restartCh:
r.logger.Printf("[DEBUG] client: task being restarted: %s", event.RestartReason)
r.setState(structs.TaskStateRunning, event)
r.killTask(event.RestartReason, stopCollection)
// TODO do a kill and then goto restart
// Since the restart isn't from a failure, restart immediately
// and don't count against the restart policy
r.restartTracker.SetRestartTriggered()
break WAIT
case event := <-r.killCh:
r.logger.Printf("[ERR] client: task being killed: %s", event.KillReason)
r.setState(structs.TaskStateRunning, event)
// TODO
r.killTask(event.KillReason, stopCollection)
return
case <-r.destroyCh:
// Store the task event that provides context on the task destroy.
@@ -530,28 +535,7 @@ func (r *TaskRunner) run() {
r.setState(structs.TaskStateRunning, r.destroyEvent)
}
// Mark that we received the kill event
timeout := driver.GetKillTimeout(r.task.KillTimeout, r.config.MaxKillTimeout)
r.setState(structs.TaskStateRunning,
structs.NewTaskEvent(structs.TaskKilling).SetKillTimeout(timeout))
// Kill the task using an exponential backoff in-case of failures.
destroySuccess, err := r.handleDestroy()
if !destroySuccess {
// We couldn't successfully destroy the resource created.
r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err)
}
// Stop collection of the task's resource usage
close(stopCollection)
// Store that the task has been destroyed and any associated error.
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(err))
r.runningLock.Lock()
r.running = false
r.runningLock.Unlock()
r.killTask("", stopCollection)
return
}
}
@@ -613,6 +597,37 @@ func (r *TaskRunner) shouldRestart() bool {
return true
}
func (r *TaskRunner) killTask(reason string, statsCh chan struct{}) {
r.runningLock.Lock()
running := r.running
r.runningLock.Unlock()
if !running {
return
}
// Mark that we received the kill event
timeout := driver.GetKillTimeout(r.task.KillTimeout, r.config.MaxKillTimeout)
r.setState(structs.TaskStateRunning,
structs.NewTaskEvent(structs.TaskKilling).SetKillTimeout(timeout).SetKillReason(reason))
// Kill the task using an exponential backoff in-case of failures.
destroySuccess, err := r.handleDestroy()
if !destroySuccess {
// We couldn't successfully destroy the resource created.
r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err)
}
// Stop collection of the task's resource usage
close(statsCh)
// Store that the task has been destroyed and any associated error.
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(err))
r.runningLock.Lock()
r.running = false
r.runningLock.Unlock()
}
// startTask creates the driver and starts the task.
func (r *TaskRunner) startTask() error {
// Create a driver
@@ -761,7 +776,7 @@ func (r *TaskRunner) handleDestroy() (destroyed bool, err error) {
func (r *TaskRunner) Restart(source, reason string) {
reasonStr := fmt.Sprintf("%s: %s", source, reason)
event := structs.NewTaskEvent(structs.TaskRestarting).SetRestartReason(reasonStr)
event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reasonStr)
r.logger.Printf("[DEBUG] client: restarting task %v for alloc %q: %v",
r.task.Name, r.alloc.ID, reasonStr)

View File

@@ -325,11 +325,7 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {
case api.TaskRestarting:
in := fmt.Sprintf("Task restarting in %v", time.Duration(event.StartDelay))
if event.RestartReason != "" && event.RestartReason != client.ReasonWithinPolicy {
if event.StartDelay == 0 {
desc = event.RestartReason
} else {
desc = fmt.Sprintf("%s - %s", event.RestartReason, in)
}
desc = fmt.Sprintf("%s - %s", event.RestartReason, in)
} else {
desc = in
}
@@ -370,6 +366,12 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {
} else {
desc = fmt.Sprintf("Task being sent signal %v: %v", sig, reason)
}
case api.TaskRestartSignal:
if event.RestartReason != "" {
desc = event.RestartReason
} else {
desc = "Task signaled to restart"
}
}
// Reverse order so we are sorted by time

View File

@@ -2387,17 +2387,20 @@ const (
// TaskKilled indicates a user has killed the task.
TaskKilled = "Killed"
// TaskRestarting indicates that task terminated and is being restarted or
// that it is being forced to be restarted.
// TaskRestarting indicates that task terminated and is being restarted.
TaskRestarting = "Restarting"
// TaskSignaling indicates that the task is being signalled.
TaskSignaling = "Signaling"
// TaskNotRestarting indicates that the task has failed and is not being
// restarted because it has exceeded its restart policy.
TaskNotRestarting = "Not Restarting"
// TaskRestartSignal indicates that the task has been signalled to be
// restarted
TaskRestartSignal = "Restart Signaled"
// TaskSignaling indicates that the task is being signalled.
TaskSignaling = "Signaling"
// TaskDownloadingArtifacts means the task is downloading the artifacts
// specified in the task.
TaskDownloadingArtifacts = "Downloading Artifacts"