mirror of
https://github.com/kemko/nomad.git
synced 2026-01-04 01:15:43 +03:00
Change how we mark tasks as failed and allow consul-template to fail tasks
This commit is contained in:
@@ -231,6 +231,7 @@ func (t *Task) SetLogConfig(l *LogConfig) *Task {
|
||||
// transitions.
|
||||
type TaskState struct {
|
||||
State string
|
||||
Failed bool
|
||||
Events []*TaskEvent
|
||||
}
|
||||
|
||||
@@ -259,6 +260,7 @@ const (
|
||||
type TaskEvent struct {
|
||||
Type string
|
||||
Time int64
|
||||
FailsTask bool
|
||||
RestartReason string
|
||||
SetupError string
|
||||
DriverError string
|
||||
|
||||
@@ -279,7 +279,7 @@ func (r *AllocRunner) Alloc() *structs.Allocation {
|
||||
case structs.TaskStatePending:
|
||||
pending = true
|
||||
case structs.TaskStateDead:
|
||||
if state.Failed() {
|
||||
if state.Failed {
|
||||
failed = true
|
||||
} else {
|
||||
dead = true
|
||||
@@ -346,11 +346,14 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
|
||||
|
||||
// Set the tasks state.
|
||||
taskState.State = state
|
||||
if event.FailsTask {
|
||||
taskState.Failed = true
|
||||
}
|
||||
r.appendTaskEvent(taskState, event)
|
||||
|
||||
if state == structs.TaskStateDead {
|
||||
// If the task failed, we should kill all the other tasks in the task group.
|
||||
if taskState.Failed() {
|
||||
if taskState.Failed {
|
||||
var destroyingTasks []string
|
||||
for task, tr := range r.tasks {
|
||||
if task != taskName {
|
||||
|
||||
@@ -75,9 +75,17 @@ func TestAllocRunner_RetryArtifact(t *testing.T) {
|
||||
|
||||
alloc := mock.Alloc()
|
||||
alloc.Job.Type = structs.JobTypeBatch
|
||||
alloc.Job.TaskGroups[0].RestartPolicy.Mode = structs.RestartPolicyModeFail
|
||||
alloc.Job.TaskGroups[0].RestartPolicy.Attempts = 1
|
||||
alloc.Job.TaskGroups[0].RestartPolicy.Delay = time.Duration(4*testutil.TestMultiplier()) * time.Second
|
||||
|
||||
task := alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Driver = "mock_driver"
|
||||
task.Config = map[string]interface{}{
|
||||
"exit_code": "0",
|
||||
"run_for": "1s",
|
||||
}
|
||||
|
||||
// Create a new task with a bad artifact
|
||||
badtask := alloc.Job.TaskGroups[0].Tasks[0].Copy()
|
||||
badtask.Name = "bad"
|
||||
@@ -109,10 +117,10 @@ func TestAllocRunner_RetryArtifact(t *testing.T) {
|
||||
// bad task should have failed
|
||||
badstate := last.TaskStates["bad"]
|
||||
if badstate.State != structs.TaskStateDead {
|
||||
return false, fmt.Errorf("expected bad to be dead but found %q", last.TaskStates["web"].State)
|
||||
return false, fmt.Errorf("expected bad to be dead but found %q", badstate.State)
|
||||
}
|
||||
if !badstate.Failed() {
|
||||
return false, fmt.Errorf("expected bad to have failed")
|
||||
if !badstate.Failed {
|
||||
return false, fmt.Errorf("expected bad to have failed: %#v", badstate.Events)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
@@ -610,7 +618,7 @@ func TestAllocRunner_TaskFailed_KillTG(t *testing.T) {
|
||||
if state2.State != structs.TaskStateDead {
|
||||
return false, fmt.Errorf("got state %v; want %v", state2.State, structs.TaskStateDead)
|
||||
}
|
||||
if !state2.Failed() {
|
||||
if !state2.Failed {
|
||||
return false, fmt.Errorf("task2 should have failed")
|
||||
}
|
||||
|
||||
|
||||
@@ -36,8 +36,9 @@ type TaskHooks interface {
|
||||
// called after prestart work is completed
|
||||
UnblockStart(source string)
|
||||
|
||||
// Kill is used to kill the task because of the passed error.
|
||||
Kill(source, reason string)
|
||||
// Kill is used to kill the task because of the passed error. If fail is set
|
||||
// to true, the task is marked as failed
|
||||
Kill(source, reason string, fail bool)
|
||||
}
|
||||
|
||||
// TaskTemplateManager is used to run a set of templates for a given task
|
||||
@@ -171,7 +172,7 @@ func (tm *TaskTemplateManager) run() {
|
||||
continue
|
||||
}
|
||||
|
||||
tm.hook.Kill("consul-template", err.Error())
|
||||
tm.hook.Kill("consul-template", err.Error(), true)
|
||||
case <-tm.runner.TemplateRenderedCh():
|
||||
// A template has been rendered, figure out what to do
|
||||
events := tm.runner.RenderEvents()
|
||||
@@ -218,7 +219,7 @@ func (tm *TaskTemplateManager) run() {
|
||||
continue
|
||||
}
|
||||
|
||||
tm.hook.Kill("consul-template", err.Error())
|
||||
tm.hook.Kill("consul-template", err.Error(), true)
|
||||
case <-tm.runner.TemplateRenderedCh():
|
||||
// A template has been rendered, figure out what to do
|
||||
var handling []string
|
||||
@@ -243,7 +244,7 @@ func (tm *TaskTemplateManager) run() {
|
||||
// Lookup the template and determine what to do
|
||||
tmpls, ok := tm.lookup[id]
|
||||
if !ok {
|
||||
tm.hook.Kill("consul-template", fmt.Sprintf("consul-template runner returned unknown template id %q", id))
|
||||
tm.hook.Kill("consul-template", fmt.Sprintf("consul-template runner returned unknown template id %q", id), true)
|
||||
return
|
||||
}
|
||||
|
||||
@@ -296,7 +297,7 @@ func (tm *TaskTemplateManager) run() {
|
||||
for signal := range signals {
|
||||
flat = append(flat, tm.signals[signal])
|
||||
}
|
||||
tm.hook.Kill("consul-template", fmt.Sprintf("Sending signals %v failed: %v", flat, err))
|
||||
tm.hook.Kill("consul-template", fmt.Sprintf("Sending signals %v failed: %v", flat, err), true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -62,7 +62,7 @@ func (m *MockTaskHooks) Signal(source, reason string, s os.Signal) error {
|
||||
return m.SignalError
|
||||
}
|
||||
|
||||
func (m *MockTaskHooks) Kill(source, reason string) {
|
||||
func (m *MockTaskHooks) Kill(source, reason string, fail bool) {
|
||||
m.KillReason = reason
|
||||
select {
|
||||
case m.KillCh <- struct{}{}:
|
||||
|
||||
@@ -370,7 +370,7 @@ func (r *TaskRunner) Run() {
|
||||
if err := r.validateTask(); err != nil {
|
||||
r.setState(
|
||||
structs.TaskStateDead,
|
||||
structs.NewTaskEvent(structs.TaskFailedValidation).SetValidationError(err))
|
||||
structs.NewTaskEvent(structs.TaskFailedValidation).SetValidationError(err).SetFailsTask())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -520,7 +520,7 @@ OUTER:
|
||||
if err := r.writeToken(token); err != nil {
|
||||
e := fmt.Errorf("failed to write Vault token to disk")
|
||||
r.logger.Printf("[ERR] client: %v for task %v on alloc %q: %v", e, r.task.Name, r.alloc.ID, err)
|
||||
r.Kill("vault", e.Error())
|
||||
r.Kill("vault", e.Error(), true)
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -545,13 +545,13 @@ OUTER:
|
||||
if err != nil {
|
||||
e := fmt.Errorf("failed to parse signal: %v", err)
|
||||
r.logger.Printf("[ERR] client: %v", err)
|
||||
r.Kill("vault", e.Error())
|
||||
r.Kill("vault", e.Error(), true)
|
||||
return
|
||||
}
|
||||
|
||||
if err := r.Signal("vault", "new Vault token acquired", s); err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to send signal to task %v for alloc %q: %v", r.task.Name, r.alloc.ID, err)
|
||||
r.Kill("vault", fmt.Sprintf("failed to send signal to task: %v", err))
|
||||
r.Kill("vault", fmt.Sprintf("failed to send signal to task: %v", err), true)
|
||||
return
|
||||
}
|
||||
case structs.VaultChangeModeRestart:
|
||||
@@ -640,7 +640,7 @@ func (r *TaskRunner) updatedTokenHandler() {
|
||||
if err := r.setTaskEnv(); err != nil {
|
||||
r.setState(
|
||||
structs.TaskStateDead,
|
||||
structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
|
||||
structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err).SetFailsTask())
|
||||
return
|
||||
}
|
||||
|
||||
@@ -653,7 +653,7 @@ func (r *TaskRunner) updatedTokenHandler() {
|
||||
r.config, r.vaultFuture.Get(), r.taskDir, r.getTaskEnv())
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to build task's template manager: %v", err)
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err))
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
|
||||
r.logger.Printf("[ERR] client: alloc %q, task %q %v", r.alloc.ID, r.task.Name, err)
|
||||
return
|
||||
}
|
||||
@@ -679,7 +679,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
|
||||
if err := r.setTaskEnv(); err != nil {
|
||||
r.setState(
|
||||
structs.TaskStateDead,
|
||||
structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err))
|
||||
structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err).SetFailsTask())
|
||||
resultCh <- false
|
||||
return
|
||||
}
|
||||
@@ -691,7 +691,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
|
||||
r.config, r.vaultFuture.Get(), r.taskDir, r.getTaskEnv())
|
||||
if err != nil {
|
||||
err := fmt.Errorf("failed to build task's template manager: %v", err)
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err))
|
||||
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
|
||||
r.logger.Printf("[ERR] client: alloc %q, task %q %v", r.alloc.ID, r.task.Name, err)
|
||||
resultCh <- false
|
||||
return
|
||||
@@ -703,6 +703,7 @@ func (r *TaskRunner) prestart(resultCh chan bool) {
|
||||
if !r.artifactsDownloaded && len(r.task.Artifacts) > 0 {
|
||||
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts))
|
||||
for _, artifact := range r.task.Artifacts {
|
||||
// TODO wrap
|
||||
if err := getter.GetArtifact(r.getTaskEnv(), artifact, r.taskDir); err != nil {
|
||||
r.setState(structs.TaskStatePending,
|
||||
structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(err))
|
||||
@@ -850,7 +851,7 @@ func (r *TaskRunner) run() {
|
||||
case event := <-r.restartCh:
|
||||
r.logger.Printf("[DEBUG] client: task being restarted: %s", event.RestartReason)
|
||||
r.setState(structs.TaskStateRunning, event)
|
||||
r.killTask(event.RestartReason)
|
||||
r.killTask(nil)
|
||||
|
||||
close(stopCollection)
|
||||
|
||||
@@ -871,16 +872,16 @@ func (r *TaskRunner) run() {
|
||||
// Store the task event that provides context on the task
|
||||
// destroy. The Killed event is set from the alloc_runner and
|
||||
// doesn't add detail
|
||||
reason := ""
|
||||
var killEvent *structs.TaskEvent
|
||||
if r.destroyEvent.Type != structs.TaskKilled {
|
||||
if r.destroyEvent.Type == structs.TaskKilling {
|
||||
reason = r.destroyEvent.KillReason
|
||||
killEvent = r.destroyEvent
|
||||
} else {
|
||||
r.setState(structs.TaskStateRunning, r.destroyEvent)
|
||||
}
|
||||
}
|
||||
|
||||
r.killTask(reason)
|
||||
r.killTask(killEvent)
|
||||
close(stopCollection)
|
||||
return
|
||||
}
|
||||
@@ -913,7 +914,7 @@ func (r *TaskRunner) shouldRestart() bool {
|
||||
if state == structs.TaskNotRestarting {
|
||||
r.setState(structs.TaskStateDead,
|
||||
structs.NewTaskEvent(structs.TaskNotRestarting).
|
||||
SetRestartReason(reason))
|
||||
SetRestartReason(reason).SetFailsTask())
|
||||
}
|
||||
return false
|
||||
case structs.TaskRestarting:
|
||||
@@ -938,7 +939,7 @@ func (r *TaskRunner) shouldRestart() bool {
|
||||
destroyed := r.destroy
|
||||
r.destroyLock.Unlock()
|
||||
if destroyed {
|
||||
r.logger.Printf("[DEBUG] client: Not restarting task: %v because it has been destroyed due to: %s", r.task.Name, r.destroyEvent.Message)
|
||||
r.logger.Printf("[DEBUG] client: Not restarting task: %v because it has been destroyed", r.task.Name)
|
||||
r.setState(structs.TaskStateDead, r.destroyEvent)
|
||||
return false
|
||||
}
|
||||
@@ -946,8 +947,10 @@ func (r *TaskRunner) shouldRestart() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// killTask kills the running task, storing the reason in the Killing TaskEvent.
|
||||
func (r *TaskRunner) killTask(reason string) {
|
||||
// killTask kills the running task. A killing event can optionally be passed and
|
||||
// this event is used to mark the task as being killed. It provides a means to
|
||||
// store extra information.
|
||||
func (r *TaskRunner) killTask(killingEvent *structs.TaskEvent) {
|
||||
r.runningLock.Lock()
|
||||
running := r.running
|
||||
r.runningLock.Unlock()
|
||||
@@ -955,10 +958,21 @@ func (r *TaskRunner) killTask(reason string) {
|
||||
return
|
||||
}
|
||||
|
||||
// Mark that we received the kill event
|
||||
// Get the kill timeout
|
||||
timeout := driver.GetKillTimeout(r.task.KillTimeout, r.config.MaxKillTimeout)
|
||||
r.setState(structs.TaskStateRunning,
|
||||
structs.NewTaskEvent(structs.TaskKilling).SetKillTimeout(timeout).SetKillReason(reason))
|
||||
|
||||
// Build the event
|
||||
var event *structs.TaskEvent
|
||||
if killingEvent != nil {
|
||||
event = killingEvent
|
||||
event.Type = structs.TaskKilling
|
||||
} else {
|
||||
event = structs.NewTaskEvent(structs.TaskKilling)
|
||||
}
|
||||
event.SetKillTimeout(timeout)
|
||||
|
||||
// Mark that we received the kill event
|
||||
r.setState(structs.TaskStateRunning, event)
|
||||
|
||||
// Kill the task using an exponential backoff in-case of failures.
|
||||
destroySuccess, err := r.handleDestroy()
|
||||
@@ -1176,11 +1190,14 @@ func (r *TaskRunner) Signal(source, reason string, s os.Signal) error {
|
||||
return <-resCh
|
||||
}
|
||||
|
||||
// Kill will kill a task and store the error, no longer restarting the task
|
||||
// TODO need to be able to fail the task
|
||||
func (r *TaskRunner) Kill(source, reason string) {
|
||||
// Kill will kill a task and store the error, no longer restarting the task. If
|
||||
// fail is set, the task is marked as having failed.
|
||||
func (r *TaskRunner) Kill(source, reason string, fail bool) {
|
||||
reasonStr := fmt.Sprintf("%s: %s", source, reason)
|
||||
event := structs.NewTaskEvent(structs.TaskKilling).SetKillReason(reasonStr)
|
||||
if fail {
|
||||
event.SetFailsTask()
|
||||
}
|
||||
|
||||
r.logger.Printf("[DEBUG] client: killing task %v for alloc %q: %v", r.task.Name, r.alloc.ID, reasonStr)
|
||||
r.Destroy(event)
|
||||
|
||||
@@ -33,11 +33,15 @@ func prefixedTestLogger(prefix string) *log.Logger {
|
||||
|
||||
type MockTaskStateUpdater struct {
|
||||
state string
|
||||
failed bool
|
||||
events []*structs.TaskEvent
|
||||
}
|
||||
|
||||
func (m *MockTaskStateUpdater) Update(name, state string, event *structs.TaskEvent) {
|
||||
m.state = state
|
||||
if event.FailsTask {
|
||||
m.failed = true
|
||||
}
|
||||
m.events = append(m.events, event)
|
||||
}
|
||||
|
||||
@@ -479,7 +483,7 @@ func TestTaskRunner_RestartTask(t *testing.T) {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
tr.Restart("test", "restart")
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
tr.Kill("test", "restart")
|
||||
tr.Kill("test", "restart", false)
|
||||
}()
|
||||
|
||||
select {
|
||||
@@ -550,7 +554,7 @@ func TestTaskRunner_KillTask(t *testing.T) {
|
||||
|
||||
go func() {
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
tr.Kill("test", "kill")
|
||||
tr.Kill("test", "kill", true)
|
||||
}()
|
||||
|
||||
select {
|
||||
@@ -567,6 +571,10 @@ func TestTaskRunner_KillTask(t *testing.T) {
|
||||
t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead)
|
||||
}
|
||||
|
||||
if !upd.failed {
|
||||
t.Fatalf("TaskState should be failed: %+v", upd)
|
||||
}
|
||||
|
||||
if upd.events[0].Type != structs.TaskReceived {
|
||||
t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived)
|
||||
}
|
||||
|
||||
@@ -2318,6 +2318,9 @@ type TaskState struct {
|
||||
// The current state of the task.
|
||||
State string
|
||||
|
||||
// Failed marks a task as having failed
|
||||
Failed bool
|
||||
|
||||
// Series of task events that transition the state of the task.
|
||||
Events []*TaskEvent
|
||||
}
|
||||
@@ -2328,6 +2331,7 @@ func (ts *TaskState) Copy() *TaskState {
|
||||
}
|
||||
copy := new(TaskState)
|
||||
copy.State = ts.State
|
||||
copy.Failed = ts.Failed
|
||||
|
||||
if ts.Events != nil {
|
||||
copy.Events = make([]*TaskEvent, len(ts.Events))
|
||||
@@ -2338,22 +2342,6 @@ func (ts *TaskState) Copy() *TaskState {
|
||||
return copy
|
||||
}
|
||||
|
||||
// Failed returns true if the task has has failed.
|
||||
func (ts *TaskState) Failed() bool {
|
||||
l := len(ts.Events)
|
||||
if ts.State != TaskStateDead || l == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
switch ts.Events[l-1].Type {
|
||||
case TaskDiskExceeded, TaskNotRestarting, TaskArtifactDownloadFailed,
|
||||
TaskFailedValidation, TaskVaultRenewalFailed, TaskSetupFailure:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
// Successful returns whether a task finished successfully.
|
||||
func (ts *TaskState) Successful() bool {
|
||||
l := len(ts.Events)
|
||||
@@ -2428,9 +2416,6 @@ const (
|
||||
// TaskSiblingFailed indicates that a sibling task in the task group has
|
||||
// failed.
|
||||
TaskSiblingFailed = "Sibling task failed"
|
||||
|
||||
// TaskVaultRenewalFailed indicates that Vault token renewal failed
|
||||
TaskVaultRenewalFailed = "Vault token renewal failed"
|
||||
)
|
||||
|
||||
// TaskEvent is an event that effects the state of a task and contains meta-data
|
||||
@@ -2439,6 +2424,9 @@ type TaskEvent struct {
|
||||
Type string
|
||||
Time int64 // Unix Nanosecond timestamp
|
||||
|
||||
// FailsTask marks whether this event fails the task
|
||||
FailsTask bool
|
||||
|
||||
// Restart fields.
|
||||
RestartReason string
|
||||
|
||||
@@ -2520,6 +2508,11 @@ func (e *TaskEvent) SetSetupError(err error) *TaskEvent {
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *TaskEvent) SetFailsTask() *TaskEvent {
|
||||
e.FailsTask = true
|
||||
return e
|
||||
}
|
||||
|
||||
func (e *TaskEvent) SetDriverError(err error) *TaskEvent {
|
||||
if err != nil {
|
||||
e.DriverError = err.Error()
|
||||
|
||||
@@ -225,6 +225,7 @@ be specified using the `?region=` query parameter.
|
||||
}
|
||||
],
|
||||
"State": "running"
|
||||
"Failed": false,
|
||||
}
|
||||
},
|
||||
"CreateIndex": 7
|
||||
@@ -249,6 +250,8 @@ be specified using the `?region=` query parameter.
|
||||
<p>The latest 10 events are stored per task. Each event is timestamped (unix nano-seconds)
|
||||
and has one of the following types:</p>
|
||||
|
||||
* `Setup Failure` - The task could not be started because there was a
|
||||
failure setting up the task prior to it running.
|
||||
* `Driver Failure` - The task could not be started due to a failure in the
|
||||
driver.
|
||||
* `Started` - The task was started; either for the first time or due to a
|
||||
@@ -262,5 +265,8 @@ be specified using the `?region=` query parameter.
|
||||
* `Not Restarting` - the task has failed and is not being restarted because it has exceeded its restart policy.
|
||||
* `Downloading Artifacts` - The task is downloading the artifact(s) specified in the task.
|
||||
* `Failed Artifact Download` - Artifact(s) specified in the task failed to download.
|
||||
* `Restart Signaled` - The task was signalled to be restarted.
|
||||
* `Signaling` - The task was is being sent a signal.
|
||||
* `Sibling task failed` - A task in the same task group failed.
|
||||
|
||||
Depending on the type the event will have applicable annotations.
|
||||
|
||||
Reference in New Issue
Block a user