mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 10:25:42 +03:00
Refactor task runner to include driver starting into restart policy and add recoverable errors
This commit is contained in:
@@ -148,9 +148,12 @@ type TaskState struct {
|
||||
|
||||
const (
|
||||
TaskDriverFailure = "Driver Failure"
|
||||
TaskReceived = "Received"
|
||||
TaskStarted = "Started"
|
||||
TaskTerminated = "Terminated"
|
||||
TaskKilled = "Killed"
|
||||
TaskRestarting = "Restarting"
|
||||
TaskNotRestarting = "Restarts Exceeded"
|
||||
)
|
||||
|
||||
// TaskEvent is an event that effects the state of a task and contains meta-data
|
||||
@@ -163,4 +166,5 @@ type TaskEvent struct {
|
||||
Signal int
|
||||
Message string
|
||||
KillError string
|
||||
StartDelay int64
|
||||
}
|
||||
|
||||
@@ -245,8 +245,7 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
|
||||
case structs.TaskStatePending:
|
||||
pending = true
|
||||
case structs.TaskStateDead:
|
||||
last := len(state.Events) - 1
|
||||
if state.Events[last].Type == structs.TaskDriverFailure {
|
||||
if state.Failed() {
|
||||
failed = true
|
||||
} else {
|
||||
dead = true
|
||||
|
||||
@@ -8,6 +8,7 @@ import (
|
||||
"os"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"regexp"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
@@ -209,32 +210,9 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task,
|
||||
|
||||
hostConfig := &docker.HostConfig{
|
||||
// Convert MB to bytes. This is an absolute value.
|
||||
//
|
||||
// This value represents the total amount of memory a process can use.
|
||||
// Swap is added to total memory and is managed by the OS, not docker.
|
||||
// Since this may cause other processes to swap and cause system
|
||||
// instability, we will simply not use swap.
|
||||
//
|
||||
// See: https://www.kernel.org/doc/Documentation/cgroups/memory.txt
|
||||
Memory: int64(task.Resources.MemoryMB) * 1024 * 1024,
|
||||
MemorySwap: -1,
|
||||
// Convert Mhz to shares. This is a relative value.
|
||||
//
|
||||
// There are two types of CPU limiters available: Shares and Quotas. A
|
||||
// Share allows a particular process to have a proportion of CPU time
|
||||
// relative to other processes; 1024 by default. A CPU Quota is enforced
|
||||
// over a Period of time and is a HARD limit on the amount of CPU time a
|
||||
// process can use. Processes with quotas cannot burst, while processes
|
||||
// with shares can, so we'll use shares.
|
||||
//
|
||||
// The simplest scale is 1 share to 1 MHz so 1024 = 1GHz. This means any
|
||||
// given process will have at least that amount of resources, but likely
|
||||
// more since it is (probably) rare that the machine will run at 100%
|
||||
// CPU. This scale will cease to work if a node is overprovisioned.
|
||||
//
|
||||
// See:
|
||||
// - https://www.kernel.org/doc/Documentation/scheduler/sched-bwc.txt
|
||||
// - https://www.kernel.org/doc/Documentation/scheduler/sched-design-CFS.txt
|
||||
CPUShares: int64(task.Resources.CPU),
|
||||
|
||||
// Binds are used to mount a host volume into the container. We mount a
|
||||
@@ -403,6 +381,22 @@ func (d *DockerDriver) createContainer(ctx *ExecContext, task *structs.Task,
|
||||
}, nil
|
||||
}
|
||||
|
||||
var (
|
||||
// imageNotFoundMatcher is a regex expression that matches the image not
|
||||
// found error Docker returns.
|
||||
imageNotFoundMatcher = regexp.MustCompile(`Error: image .+ not found`)
|
||||
)
|
||||
|
||||
// recoverablePullError wraps the error gotten when trying to pull and image if
|
||||
// the error is recoverable.
|
||||
func (d *DockerDriver) recoverablePullError(err error, image string) error {
|
||||
recoverable := true
|
||||
if imageNotFoundMatcher.MatchString(err.Error()) {
|
||||
recoverable = false
|
||||
}
|
||||
return cstructs.NewRecoverableError(fmt.Errorf("Failed to pull `%s`: %s", image, err), recoverable)
|
||||
}
|
||||
|
||||
func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle, error) {
|
||||
var driverConfig DockerDriverConfig
|
||||
if err := mapstructure.WeakDecode(task.Config, &driverConfig); err != nil {
|
||||
@@ -482,7 +476,7 @@ func (d *DockerDriver) Start(ctx *ExecContext, task *structs.Task) (DriverHandle
|
||||
err = client.PullImage(pullOptions, authOptions)
|
||||
if err != nil {
|
||||
d.logger.Printf("[ERR] driver.docker: failed pulling container %s:%s: %s", repo, tag, err)
|
||||
return nil, fmt.Errorf("Failed to pull `%s`: %s", image, err)
|
||||
return nil, d.recoverablePullError(err, image)
|
||||
}
|
||||
d.logger.Printf("[DEBUG] driver.docker: docker pull %s:%s succeeded", repo, tag)
|
||||
|
||||
|
||||
@@ -277,8 +277,7 @@ func (h *execHandle) run() {
|
||||
h.logger.Printf("[ERR] driver.exec: unmounting dev,proc and alloc dirs failed: %v", e)
|
||||
}
|
||||
}
|
||||
h.waitCh <- &cstructs.WaitResult{ExitCode: ps.ExitCode, Signal: 0,
|
||||
Err: err}
|
||||
h.waitCh <- cstructs.NewWaitResult(ps.ExitCode, 0, err)
|
||||
close(h.waitCh)
|
||||
h.pluginClient.Kill()
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package structs
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
cgroupConfig "github.com/opencontainers/runc/libcontainer/configs"
|
||||
)
|
||||
|
||||
@@ -34,3 +35,23 @@ func (r *WaitResult) String() string {
|
||||
type IsolationConfig struct {
|
||||
Cgroup *cgroupConfig.Cgroup
|
||||
}
|
||||
|
||||
// RecoverableError wraps an error and marks whether it is recoverable and could
|
||||
// be retried or it is fatal.
|
||||
type RecoverableError struct {
|
||||
Err error
|
||||
Recoverable bool
|
||||
}
|
||||
|
||||
// NewRecoverableError is used to wrap an error and mark it as recoverable or
|
||||
// not.
|
||||
func NewRecoverableError(e error, recoverable bool) *RecoverableError {
|
||||
return &RecoverableError{
|
||||
Err: e,
|
||||
Recoverable: recoverable,
|
||||
}
|
||||
}
|
||||
|
||||
func (r *RecoverableError) Error() string {
|
||||
return r.Err.Error()
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
@@ -25,6 +26,8 @@ func newRestartTracker(policy *structs.RestartPolicy, jobType string) *RestartTr
|
||||
}
|
||||
|
||||
type RestartTracker struct {
|
||||
waitRes *cstructs.WaitResult
|
||||
startErr error
|
||||
count int // Current number of attempts.
|
||||
onSuccess bool // Whether to restart on successful exit code.
|
||||
startTime time.Time // When the interval began
|
||||
@@ -40,46 +43,107 @@ func (r *RestartTracker) SetPolicy(policy *structs.RestartPolicy) {
|
||||
r.policy = policy
|
||||
}
|
||||
|
||||
// NextRestart takes the exit code from the last attempt and returns whether the
|
||||
// task should be restarted and the duration to wait.
|
||||
func (r *RestartTracker) NextRestart(exitCode int) (bool, time.Duration) {
|
||||
// SetStartError is used to mark the most recent start error. If starting was
|
||||
// successful the error should be nil.
|
||||
func (r *RestartTracker) SetStartError(err error) *RestartTracker {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
r.startErr = err
|
||||
return r
|
||||
}
|
||||
|
||||
// SetWaitResult is used to mark the most recent wait result.
|
||||
func (r *RestartTracker) SetWaitResult(res *cstructs.WaitResult) *RestartTracker {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
r.waitRes = res
|
||||
return r
|
||||
}
|
||||
|
||||
// GetState returns the tasks next state given the set exit code and start
|
||||
// error. One of the following states are returned:
|
||||
// * TaskRestarting - Task should be restarted
|
||||
// * TaskNotRestarting - Task should not be restarted and has exceeded its
|
||||
// restart policy.
|
||||
// * TaskTerminated - Task has terminated successfully and does not need a
|
||||
// restart.
|
||||
//
|
||||
// If TaskRestarting is returned, the duration is how long to wait until
|
||||
// starting the task again.
|
||||
func (r *RestartTracker) GetState() (string, time.Duration) {
|
||||
r.lock.Lock()
|
||||
defer r.lock.Unlock()
|
||||
|
||||
// Hot path if no attempts are expected
|
||||
if r.policy.Attempts == 0 {
|
||||
return false, 0
|
||||
if r.waitRes != nil && r.waitRes.Successful() {
|
||||
return structs.TaskTerminated, 0
|
||||
}
|
||||
|
||||
return structs.TaskNotRestarting, 0
|
||||
}
|
||||
|
||||
r.count++
|
||||
|
||||
// Check if we have entered a new interval.
|
||||
end := r.startTime.Add(r.policy.Interval)
|
||||
now := time.Now()
|
||||
if now.After(end) {
|
||||
r.count = 0
|
||||
r.startTime = now
|
||||
return r.shouldRestart(exitCode), r.jitter()
|
||||
}
|
||||
|
||||
r.count++
|
||||
|
||||
// If we are under the attempts, restart with delay.
|
||||
if r.count <= r.policy.Attempts {
|
||||
return r.shouldRestart(exitCode), r.jitter()
|
||||
if r.startErr != nil {
|
||||
return r.handleStartError()
|
||||
} else if r.waitRes != nil {
|
||||
return r.handleWaitResult()
|
||||
} else {
|
||||
return "", 0
|
||||
}
|
||||
|
||||
// Don't restart since mode is "fail"
|
||||
if r.policy.Mode == structs.RestartPolicyModeFail {
|
||||
return false, 0
|
||||
}
|
||||
|
||||
// Apply an artifical wait to enter the next interval
|
||||
return r.shouldRestart(exitCode), end.Sub(now)
|
||||
}
|
||||
|
||||
// shouldRestart returns whether a restart should occur based on the exit code
|
||||
// and job type.
|
||||
func (r *RestartTracker) shouldRestart(exitCode int) bool {
|
||||
return exitCode != 0 || r.onSuccess
|
||||
// handleStartError returns the new state and potential wait duration for
|
||||
// restarting the task after it was not successfully started. On start errors,
|
||||
// the restart policy is always treated as fail mode to ensure we don't
|
||||
// infinitely try to start a task.
|
||||
func (r *RestartTracker) handleStartError() (string, time.Duration) {
|
||||
// If the error is not recoverable, do not restart.
|
||||
if rerr, ok := r.startErr.(*cstructs.RecoverableError); !(ok && rerr.Recoverable) {
|
||||
return structs.TaskNotRestarting, 0
|
||||
}
|
||||
|
||||
if r.count > r.policy.Attempts {
|
||||
return structs.TaskNotRestarting, 0
|
||||
}
|
||||
|
||||
return structs.TaskRestarting, r.jitter()
|
||||
}
|
||||
|
||||
// handleWaitResult returns the new state and potential wait duration for
|
||||
// restarting the task after it has exited.
|
||||
func (r *RestartTracker) handleWaitResult() (string, time.Duration) {
|
||||
// If the task started successfully and restart on success isn't specified,
|
||||
// don't restart but don't mark as failed.
|
||||
if r.waitRes.Successful() && !r.onSuccess {
|
||||
return structs.TaskTerminated, 0
|
||||
}
|
||||
|
||||
if r.count > r.policy.Attempts {
|
||||
if r.policy.Mode == structs.RestartPolicyModeFail {
|
||||
return structs.TaskNotRestarting, 0
|
||||
} else {
|
||||
return structs.TaskRestarting, r.getDelay()
|
||||
}
|
||||
}
|
||||
|
||||
return structs.TaskRestarting, r.jitter()
|
||||
}
|
||||
|
||||
// getDelay returns the delay time to enter the next interval.
|
||||
func (r *RestartTracker) getDelay() time.Duration {
|
||||
end := r.startTime.Add(r.policy.Interval)
|
||||
now := time.Now()
|
||||
return end.Sub(now)
|
||||
}
|
||||
|
||||
// jitter returns the delay time plus a jitter.
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
package client
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
cstructs "github.com/hashicorp/nomad/client/driver/structs"
|
||||
"github.com/hashicorp/nomad/nomad/structs"
|
||||
)
|
||||
|
||||
@@ -23,14 +25,18 @@ func withinJitter(expected, actual time.Duration) bool {
|
||||
expected.Nanoseconds()) <= jitter
|
||||
}
|
||||
|
||||
func testWaitResult(exit int) *cstructs.WaitResult {
|
||||
return cstructs.NewWaitResult(exit, 0, nil)
|
||||
}
|
||||
|
||||
func TestClient_RestartTracker_ModeDelay(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := testPolicy(true, structs.RestartPolicyModeDelay)
|
||||
rt := newRestartTracker(p, structs.JobTypeService)
|
||||
for i := 0; i < p.Attempts; i++ {
|
||||
actual, when := rt.NextRestart(127)
|
||||
if !actual {
|
||||
t.Fatalf("NextRestart() returned %v, want %v", actual, true)
|
||||
state, when := rt.SetWaitResult(testWaitResult(127)).GetState()
|
||||
if state != structs.TaskRestarting {
|
||||
t.Fatalf("NextRestart() returned %v, want %v", state, structs.TaskRestarting)
|
||||
}
|
||||
if !withinJitter(p.Delay, when) {
|
||||
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
|
||||
@@ -39,8 +45,8 @@ func TestClient_RestartTracker_ModeDelay(t *testing.T) {
|
||||
|
||||
// Follow up restarts should cause delay.
|
||||
for i := 0; i < 3; i++ {
|
||||
actual, when := rt.NextRestart(127)
|
||||
if !actual {
|
||||
state, when := rt.SetWaitResult(testWaitResult(127)).GetState()
|
||||
if state != structs.TaskRestarting {
|
||||
t.Fail()
|
||||
}
|
||||
if !(when > p.Delay && when <= p.Interval) {
|
||||
@@ -54,9 +60,9 @@ func TestClient_RestartTracker_ModeFail(t *testing.T) {
|
||||
p := testPolicy(true, structs.RestartPolicyModeFail)
|
||||
rt := newRestartTracker(p, structs.JobTypeSystem)
|
||||
for i := 0; i < p.Attempts; i++ {
|
||||
actual, when := rt.NextRestart(127)
|
||||
if !actual {
|
||||
t.Fatalf("NextRestart() returned %v, want %v", actual, true)
|
||||
state, when := rt.SetWaitResult(testWaitResult(127)).GetState()
|
||||
if state != structs.TaskRestarting {
|
||||
t.Fatalf("NextRestart() returned %v, want %v", state, structs.TaskRestarting)
|
||||
}
|
||||
if !withinJitter(p.Delay, when) {
|
||||
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
|
||||
@@ -64,8 +70,8 @@ func TestClient_RestartTracker_ModeFail(t *testing.T) {
|
||||
}
|
||||
|
||||
// Next restart should cause fail
|
||||
if actual, _ := rt.NextRestart(127); actual {
|
||||
t.Fail()
|
||||
if state, _ := rt.SetWaitResult(testWaitResult(127)).GetState(); state != structs.TaskNotRestarting {
|
||||
t.Fatalf("NextRestart() returned %v; want %v", state, structs.TaskNotRestarting)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -73,8 +79,8 @@ func TestClient_RestartTracker_NoRestartOnSuccess(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := testPolicy(false, structs.RestartPolicyModeDelay)
|
||||
rt := newRestartTracker(p, structs.JobTypeBatch)
|
||||
if shouldRestart, _ := rt.NextRestart(0); shouldRestart {
|
||||
t.Fatalf("NextRestart() returned %v, expected: %v", shouldRestart, false)
|
||||
if state, _ := rt.SetWaitResult(testWaitResult(0)).GetState(); state != structs.TaskTerminated {
|
||||
t.Fatalf("NextRestart() returned %v, expected: %v", state, structs.TaskTerminated)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -83,7 +89,28 @@ func TestClient_RestartTracker_ZeroAttempts(t *testing.T) {
|
||||
p := testPolicy(true, structs.RestartPolicyModeFail)
|
||||
p.Attempts = 0
|
||||
rt := newRestartTracker(p, structs.JobTypeService)
|
||||
if actual, when := rt.NextRestart(1); actual {
|
||||
if state, when := rt.SetWaitResult(testWaitResult(1)).GetState(); state != structs.TaskNotRestarting {
|
||||
t.Fatalf("expect no restart, got restart/delay: %v", when)
|
||||
}
|
||||
}
|
||||
|
||||
func TestClient_RestartTracker_StartError_Recoverable(t *testing.T) {
|
||||
t.Parallel()
|
||||
p := testPolicy(true, structs.RestartPolicyModeDelay)
|
||||
rt := newRestartTracker(p, structs.JobTypeSystem)
|
||||
recErr := cstructs.NewRecoverableError(fmt.Errorf("foo"), true)
|
||||
for i := 0; i < p.Attempts; i++ {
|
||||
state, when := rt.SetStartError(recErr).GetState()
|
||||
if state != structs.TaskRestarting {
|
||||
t.Fatalf("NextRestart() returned %v, want %v", state, structs.TaskRestarting)
|
||||
}
|
||||
if !withinJitter(p.Delay, when) {
|
||||
t.Fatalf("NextRestart() returned %v; want %v+jitter", when, p.Delay)
|
||||
}
|
||||
}
|
||||
|
||||
// Next restart should cause fail
|
||||
if state, _ := rt.SetStartError(recErr).GetState(); state != structs.TaskNotRestarting {
|
||||
t.Fatalf("NextRestart() returned %v; want %v", state, structs.TaskNotRestarting)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -203,33 +203,6 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) {
|
||||
return driver, err
|
||||
}
|
||||
|
||||
// startTask is used to start the task if there is no handle
|
||||
func (r *TaskRunner) startTask() error {
|
||||
// Create a driver
|
||||
driver, err := r.createDriver()
|
||||
if err != nil {
|
||||
e := structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err)
|
||||
r.setState(structs.TaskStateDead, e)
|
||||
return err
|
||||
}
|
||||
|
||||
// Start the job
|
||||
handle, err := driver.Start(r.ctx, r.task)
|
||||
if err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s': %v",
|
||||
r.task.Name, r.alloc.ID, err)
|
||||
e := structs.NewTaskEvent(structs.TaskDriverFailure).
|
||||
SetDriverError(fmt.Errorf("failed to start: %v", err))
|
||||
r.setState(structs.TaskStateDead, e)
|
||||
return err
|
||||
}
|
||||
r.handleLock.Lock()
|
||||
r.handle = handle
|
||||
r.handleLock.Unlock()
|
||||
r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
|
||||
return nil
|
||||
}
|
||||
|
||||
// Run is a long running routine used to manage the task
|
||||
func (r *TaskRunner) Run() {
|
||||
defer close(r.waitCh)
|
||||
@@ -241,33 +214,48 @@ func (r *TaskRunner) Run() {
|
||||
}
|
||||
|
||||
func (r *TaskRunner) run() {
|
||||
var forceStart bool
|
||||
for {
|
||||
// Start the task if not yet started or it is being forced.
|
||||
// Start the task if not yet started or it is being forced. This logic
|
||||
// is necessary because in the case of a restore the handle already
|
||||
// exists.
|
||||
r.handleLock.Lock()
|
||||
handleEmpty := r.handle == nil
|
||||
r.handleLock.Unlock()
|
||||
if handleEmpty || forceStart {
|
||||
forceStart = false
|
||||
if err := r.startTask(); err != nil {
|
||||
return
|
||||
if handleEmpty {
|
||||
startErr := r.startTask()
|
||||
r.restartTracker.SetStartError(startErr)
|
||||
if startErr != nil {
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(startErr))
|
||||
goto RESTART
|
||||
}
|
||||
}
|
||||
|
||||
// Store the errors that caused use to stop waiting for updates.
|
||||
var waitRes *cstructs.WaitResult
|
||||
var destroyErr error
|
||||
destroyed := false
|
||||
|
||||
// Register the services defined by the task with Consil
|
||||
// Mark the task as started and register it with Consul.
|
||||
r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
|
||||
r.consulService.Register(r.task, r.alloc)
|
||||
|
||||
OUTER:
|
||||
// Wait for updates
|
||||
WAIT:
|
||||
for {
|
||||
select {
|
||||
case waitRes = <-r.handle.WaitCh():
|
||||
break OUTER
|
||||
case waitRes := <-r.handle.WaitCh():
|
||||
// De-Register the services belonging to the task from consul
|
||||
r.consulService.Deregister(r.task, r.alloc)
|
||||
|
||||
if waitRes == nil {
|
||||
panic("nil wait")
|
||||
}
|
||||
|
||||
// Log whether the task was successful or not.
|
||||
r.restartTracker.SetWaitResult(waitRes)
|
||||
r.setState(structs.TaskStateDead, r.waitErrorToEvent(waitRes))
|
||||
if !waitRes.Successful() {
|
||||
r.logger.Printf("[INFO] client: task %q for alloc %q failed: %v", r.task.Name, r.alloc.ID, waitRes)
|
||||
} else {
|
||||
r.logger.Printf("[INFO] client: task %q for alloc %q completed successfully", r.task.Name, r.alloc.ID)
|
||||
}
|
||||
|
||||
break WAIT
|
||||
case update := <-r.updateCh:
|
||||
if err := r.handleUpdate(update); err != nil {
|
||||
r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err)
|
||||
@@ -278,50 +266,32 @@ func (r *TaskRunner) run() {
|
||||
if !destroySuccess {
|
||||
// We couldn't successfully destroy the resource created.
|
||||
r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err)
|
||||
} else {
|
||||
// Wait for the task to exit but cap the time to ensure we don't block.
|
||||
select {
|
||||
case waitRes = <-r.handle.WaitCh():
|
||||
case <-time.After(3 * time.Second):
|
||||
}
|
||||
}
|
||||
|
||||
// Store that the task has been destroyed and any associated error.
|
||||
destroyed = true
|
||||
destroyErr = err
|
||||
break OUTER
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(err))
|
||||
r.consulService.Deregister(r.task, r.alloc)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// De-Register the services belonging to the task from consul
|
||||
r.consulService.Deregister(r.task, r.alloc)
|
||||
|
||||
// If the user destroyed the task, we do not attempt to do any restarts.
|
||||
if destroyed {
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr))
|
||||
return
|
||||
}
|
||||
|
||||
// Log whether the task was successful or not.
|
||||
if !waitRes.Successful() {
|
||||
r.logger.Printf("[ERR] client: failed to complete task '%s' for alloc '%s': %v", r.task.Name, r.alloc.ID, waitRes)
|
||||
} else {
|
||||
r.logger.Printf("[INFO] client: completed task '%s' for alloc '%s'", r.task.Name, r.alloc.ID)
|
||||
}
|
||||
|
||||
// Check if we should restart. If not mark task as dead and exit.
|
||||
shouldRestart, when := r.restartTracker.NextRestart(waitRes.ExitCode)
|
||||
waitEvent := r.waitErrorToEvent(waitRes)
|
||||
if !shouldRestart {
|
||||
RESTART:
|
||||
state, when := r.restartTracker.GetState()
|
||||
switch state {
|
||||
case structs.TaskNotRestarting, structs.TaskTerminated:
|
||||
r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID)
|
||||
r.setState(structs.TaskStateDead, waitEvent)
|
||||
if state == structs.TaskNotRestarting {
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskNotRestarting))
|
||||
}
|
||||
return
|
||||
case structs.TaskRestarting:
|
||||
r.logger.Printf("[INFO] client: Restarting task %q for alloc %q in %v", r.task.Name, r.alloc.ID, when)
|
||||
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskRestarting).SetRestartDelay(when))
|
||||
default:
|
||||
r.logger.Printf("[ERR] client: restart tracker returned unknown state: %q", state)
|
||||
return
|
||||
}
|
||||
|
||||
r.logger.Printf("[INFO] client: Restarting Task: %v", r.task.Name)
|
||||
r.logger.Printf("[DEBUG] client: Sleeping for %v before restarting Task %v", when, r.task.Name)
|
||||
r.setState(structs.TaskStatePending, waitEvent)
|
||||
|
||||
// Sleep but watch for destroy events.
|
||||
select {
|
||||
case <-time.After(when):
|
||||
@@ -330,7 +300,7 @@ func (r *TaskRunner) run() {
|
||||
|
||||
// Destroyed while we were waiting to restart, so abort.
|
||||
r.destroyLock.Lock()
|
||||
destroyed = r.destroy
|
||||
destroyed := r.destroy
|
||||
r.destroyLock.Unlock()
|
||||
if destroyed {
|
||||
r.logger.Printf("[DEBUG] client: Not restarting task: %v because it's destroyed by user", r.task.Name)
|
||||
@@ -338,11 +308,34 @@ func (r *TaskRunner) run() {
|
||||
return
|
||||
}
|
||||
|
||||
// Set force start because we are restarting the task.
|
||||
forceStart = true
|
||||
// Clear the handle so a new driver will be created.
|
||||
r.handle = nil
|
||||
}
|
||||
}
|
||||
|
||||
func (r *TaskRunner) startTask() error {
|
||||
// Create a driver
|
||||
driver, err := r.createDriver()
|
||||
if err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to create driver of task '%s' for alloc '%s': %v",
|
||||
r.task.Name, r.alloc.ID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
// Start the job
|
||||
handle, err := driver.Start(r.ctx, r.task)
|
||||
if err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to start task '%s' for alloc '%s': %v",
|
||||
r.task.Name, r.alloc.ID, err)
|
||||
return err
|
||||
}
|
||||
|
||||
r.handleLock.Lock()
|
||||
r.handle = handle
|
||||
r.handleLock.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleUpdate takes an updated allocation and updates internal state to
|
||||
// reflect the new config for the task.
|
||||
func (r *TaskRunner) handleUpdate(update *structs.Allocation) error {
|
||||
|
||||
@@ -96,14 +96,29 @@ func TestTaskRunner_Destroy(t *testing.T) {
|
||||
|
||||
// Change command to ensure we run for a bit
|
||||
tr.task.Config["command"] = "/bin/sleep"
|
||||
tr.task.Config["args"] = []string{"10"}
|
||||
tr.task.Config["args"] = []string{"1000"}
|
||||
go tr.Run()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if l := len(upd.events); l != 2 {
|
||||
return false, fmt.Errorf("Expect two events; got %v", l)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
return false, fmt.Errorf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
if upd.events[1].Type != structs.TaskStarted {
|
||||
return false, fmt.Errorf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
// Begin the tear down
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
tr.Destroy()
|
||||
}()
|
||||
tr.Destroy()
|
||||
|
||||
select {
|
||||
case <-tr.WaitCh():
|
||||
@@ -119,14 +134,6 @@ func TestTaskRunner_Destroy(t *testing.T) {
|
||||
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
if upd.events[1].Type != structs.TaskStarted {
|
||||
t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted)
|
||||
}
|
||||
|
||||
if upd.events[2].Type != structs.TaskKilled {
|
||||
t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskKilled)
|
||||
}
|
||||
|
||||
@@ -190,10 +190,22 @@ func (c *AllocStatusCommand) taskStatus(alloc *api.Allocation) {
|
||||
// Build up the description based on the event type.
|
||||
var desc string
|
||||
switch event.Type {
|
||||
case api.TaskStarted:
|
||||
desc = "Task started by client"
|
||||
case api.TaskReceived:
|
||||
desc = "Task received by client"
|
||||
case api.TaskDriverFailure:
|
||||
desc = event.DriverError
|
||||
if event.DriverError != "" {
|
||||
desc = event.DriverError
|
||||
} else {
|
||||
desc = "Failed to start task"
|
||||
}
|
||||
case api.TaskKilled:
|
||||
desc = event.KillError
|
||||
if event.KillError != "" {
|
||||
desc = event.KillError
|
||||
} else {
|
||||
desc = "Task successfully killed"
|
||||
}
|
||||
case api.TaskTerminated:
|
||||
var parts []string
|
||||
parts = append(parts, fmt.Sprintf("Exit Code: %d", event.ExitCode))
|
||||
@@ -206,6 +218,10 @@ func (c *AllocStatusCommand) taskStatus(alloc *api.Allocation) {
|
||||
parts = append(parts, fmt.Sprintf("Exit Message: %q", event.Message))
|
||||
}
|
||||
desc = strings.Join(parts, ", ")
|
||||
case api.TaskRestarting:
|
||||
desc = fmt.Sprintf("Task restarting in %v", time.Duration(event.StartDelay))
|
||||
case api.TaskNotRestarting:
|
||||
desc = "Task exceeded restart policy"
|
||||
}
|
||||
|
||||
// Reverse order so we are sorted by time
|
||||
|
||||
@@ -1689,6 +1689,16 @@ func (ts *TaskState) Copy() *TaskState {
|
||||
return copy
|
||||
}
|
||||
|
||||
// Failed returns if the task has has failed.
|
||||
func (ts *TaskState) Failed() bool {
|
||||
l := len(ts.Events)
|
||||
if ts.State != TaskStateDead || l == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
return ts.Events[l-1].Type == TaskNotRestarting
|
||||
}
|
||||
|
||||
const (
|
||||
// A Driver failure indicates that the task could not be started due to a
|
||||
// failure in the driver.
|
||||
@@ -1707,6 +1717,13 @@ const (
|
||||
|
||||
// Task Killed indicates a user has killed the task.
|
||||
TaskKilled = "Killed"
|
||||
|
||||
// TaskRestarting indicates that task terminated and is being restarted.
|
||||
TaskRestarting = "Restarting"
|
||||
|
||||
// TaskNotRestarting indicates that the task has failed and is not being
|
||||
// restarted because it has exceeded its restart policy.
|
||||
TaskNotRestarting = "Restarts Exceeded"
|
||||
)
|
||||
|
||||
// TaskEvent is an event that effects the state of a task and contains meta-data
|
||||
@@ -1725,6 +1742,13 @@ type TaskEvent struct {
|
||||
|
||||
// Task Killed Fields.
|
||||
KillError string // Error killing the task.
|
||||
|
||||
// TaskRestarting fields.
|
||||
StartDelay int64 // The sleep period before restarting the task in unix nanoseconds.
|
||||
}
|
||||
|
||||
func (te *TaskEvent) GoString() string {
|
||||
return fmt.Sprintf("%v at %v", te.Type, te.Time)
|
||||
}
|
||||
|
||||
func (te *TaskEvent) Copy() *TaskEvent {
|
||||
@@ -1774,6 +1798,11 @@ func (e *TaskEvent) SetKillError(err error) *TaskEvent {
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *TaskEvent) SetRestartDelay(delay time.Duration) *TaskEvent {
|
||||
e.StartDelay = int64(delay)
|
||||
return e
|
||||
}
|
||||
|
||||
// Validate is used to sanity check a task group
|
||||
func (t *Task) Validate() error {
|
||||
var mErr multierror.Error
|
||||
|
||||
Reference in New Issue
Block a user