diff --git a/api/tasks.go b/api/tasks.go index 48e536810..8466c8cb3 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -167,13 +167,13 @@ type TaskArtifact struct { } type Template struct { - SourcePath string - DestPath string - EmbededTmpl string - ChangeMode string - RestartSignal string - Splay time.Duration - Once bool + SourcePath string + DestPath string + EmbeddedTmpl string + ChangeMode string + ChangeSignal string + Splay time.Duration + Once bool } type Vault struct { @@ -248,25 +248,30 @@ const ( TaskDiskExceeded = "Disk Exceeded" TaskVaultRenewalFailed = "Vault token renewal failed" TaskSiblingFailed = "Sibling task failed" + TaskSignaling = "Signaling" + TaskRestartSignal = "Restart Signaled" ) // TaskEvent is an event that effects the state of a task and contains meta-data // appropriate to the events type. type TaskEvent struct { - Type string - Time int64 - RestartReason string - DriverError string - ExitCode int - Signal int - Message string - KillTimeout time.Duration - KillError string - StartDelay int64 - DownloadError string - ValidationError string - DiskLimit int64 - DiskSize int64 - FailedSibling string - VaultError string + Type string + Time int64 + RestartReason string + DriverError string + ExitCode int + Signal int + Message string + KillReason string + KillTimeout time.Duration + KillError string + StartDelay int64 + DownloadError string + ValidationError string + DiskLimit int64 + DiskSize int64 + FailedSibling string + VaultError string + TaskSignalReason string + TaskSignal string } diff --git a/client/consul_template.go b/client/consul_template.go index 0103a5055..f306306ba 100644 --- a/client/consul_template.go +++ b/client/consul_template.go @@ -26,17 +26,17 @@ var ( // TaskHooks is an interface which provides hooks into the tasks life-cycle type TaskHooks interface { // Restart is used to restart the task - Restart() + Restart(source, reason string) // Signal is used to signal the task - Signal(os.Signal) + Signal(source, reason string, s os.Signal) // UnblockStart is used to unblock the starting of the task. This should be // called after prestart work is completed - UnblockStart() + UnblockStart(source string) // Kill is used to kill the task because of the passed error. - Kill(error) + Kill(source, reason string) } // TaskTemplateManager is used to run a set of templates for a given task @@ -141,6 +141,11 @@ func (tm *TaskTemplateManager) Stop() { // run is the long lived loop that handles errors and templates being rendered func (tm *TaskTemplateManager) run() { if tm.runner == nil { + // Unblock the start if there is nothing to do + if !tm.allRendered { + tm.hook.UnblockStart("consul-template") + } + return } @@ -164,7 +169,7 @@ func (tm *TaskTemplateManager) run() { continue } - tm.hook.Kill(err) + tm.hook.Kill("consul-template", err.Error()) case <-tm.runner.TemplateRenderedCh(): // A template has been rendered, figure out what to do events := tm.runner.RenderEvents() @@ -186,7 +191,7 @@ func (tm *TaskTemplateManager) run() { } allRenderedTime = time.Now() - tm.hook.UnblockStart() + tm.hook.UnblockStart("consul-template") } // If all our templates are change mode no-op, then we can exit here @@ -207,7 +212,7 @@ func (tm *TaskTemplateManager) run() { continue } - tm.hook.Kill(err) + tm.hook.Kill("consul-template", err.Error()) case <-tm.runner.TemplateRenderedCh(): // A template has been rendered, figure out what to do var handling []string @@ -232,7 +237,7 @@ func (tm *TaskTemplateManager) run() { // Lookup the template and determine what to do tmpls, ok := tm.lookup[id] if !ok { - tm.hook.Kill(fmt.Errorf("consul-template runner returned unknown template id %q", id)) + tm.hook.Kill("consul-template", fmt.Sprintf("consul-template runner returned unknown template id %q", id)) return } @@ -270,10 +275,10 @@ func (tm *TaskTemplateManager) run() { } if restart { - tm.hook.Restart() + tm.hook.Restart("consul-template", "template with change_mode restart re-rendered") } else if len(signals) != 0 { for signal := range signals { - tm.hook.Signal(tm.signals[signal]) + tm.hook.Signal("consul-template", "template re-rendered", tm.signals[signal]) } } } diff --git a/client/consul_template_test.go b/client/consul_template_test.go index 08f73c1e1..31ea8a7b8 100644 --- a/client/consul_template_test.go +++ b/client/consul_template_test.go @@ -29,7 +29,7 @@ type MockTaskHooks struct { UnblockCh chan struct{} Unblocked bool - KillError error + KillReason string } func NewMockTaskHooks() *MockTaskHooks { @@ -39,7 +39,7 @@ func NewMockTaskHooks() *MockTaskHooks { SignalCh: make(chan struct{}, 1), } } -func (m *MockTaskHooks) Restart() { +func (m *MockTaskHooks) Restart(source, reason string) { m.Restarts++ select { case m.RestartCh <- struct{}{}: @@ -47,7 +47,7 @@ func (m *MockTaskHooks) Restart() { } } -func (m *MockTaskHooks) Signal(s os.Signal) { +func (m *MockTaskHooks) Signal(source, reason string, s os.Signal) { m.Signals = append(m.Signals, s) select { case m.SignalCh <- struct{}{}: @@ -55,8 +55,8 @@ func (m *MockTaskHooks) Signal(s os.Signal) { } } -func (m *MockTaskHooks) Kill(e error) { m.KillError = e } -func (m *MockTaskHooks) UnblockStart() { +func (m *MockTaskHooks) Kill(source, reason string) { m.KillReason = reason } +func (m *MockTaskHooks) UnblockStart(source string) { if !m.Unblocked { close(m.UnblockCh) } diff --git a/client/restarts.go b/client/restarts.go index 3fbffc4e0..1c6a6b843 100644 --- a/client/restarts.go +++ b/client/restarts.go @@ -34,15 +34,16 @@ func newRestartTracker(policy *structs.RestartPolicy, jobType string) *RestartTr } type RestartTracker struct { - waitRes *cstructs.WaitResult - startErr error - count int // Current number of attempts. - onSuccess bool // Whether to restart on successful exit code. - startTime time.Time // When the interval began - reason string // The reason for the last state - policy *structs.RestartPolicy - rand *rand.Rand - lock sync.Mutex + waitRes *cstructs.WaitResult + startErr error + restartTriggered bool // Whether the task has been signalled to be restarted + count int // Current number of attempts. + onSuccess bool // Whether to restart on successful exit code. + startTime time.Time // When the interval began + reason string // The reason for the last state + policy *structs.RestartPolicy + rand *rand.Rand + lock sync.Mutex } // SetPolicy updates the policy used to determine restarts. @@ -69,6 +70,15 @@ func (r *RestartTracker) SetWaitResult(res *cstructs.WaitResult) *RestartTracker return r } +// SetRestartTriggered is used to mark that the task has been signalled to be +// restarted +func (r *RestartTracker) SetRestartTriggered() *RestartTracker { + r.lock.Lock() + defer r.lock.Unlock() + r.restartTriggered = true + return r +} + // GetReason returns a human-readable description for the last state returned by // GetState. func (r *RestartTracker) GetReason() string { @@ -111,13 +121,25 @@ func (r *RestartTracker) GetState() (string, time.Duration) { r.startTime = now } + var state string + var dur time.Duration if r.startErr != nil { - return r.handleStartError() + state, dur = r.handleStartError() } else if r.waitRes != nil { - return r.handleWaitResult() + state, dur = r.handleWaitResult() + } else if r.restartTriggered { + state, dur = structs.TaskRestarting, 0 + r.reason = "" } else { - return "", 0 + state, dur = "", 0 } + + // Clear out the existing state + r.startErr = nil + r.waitRes = nil + r.restartTriggered = false + + return state, dur } // handleStartError returns the new state and potential wait duration for diff --git a/client/restarts_test.go b/client/restarts_test.go index 0a15e2285..86960b1f7 100644 --- a/client/restarts_test.go +++ b/client/restarts_test.go @@ -94,6 +94,16 @@ func TestClient_RestartTracker_ZeroAttempts(t *testing.T) { } } +func TestClient_RestartTracker_RestartTriggered(t *testing.T) { + t.Parallel() + p := testPolicy(true, structs.RestartPolicyModeFail) + p.Attempts = 0 + rt := newRestartTracker(p, structs.JobTypeService) + if state, when := rt.SetRestartTriggered().GetState(); state != structs.TaskRestarting && when != 0 { + t.Fatalf("expect restart immediately, got %v %v", state, when) + } +} + func TestClient_RestartTracker_StartError_Recoverable_Fail(t *testing.T) { t.Parallel() p := testPolicy(true, structs.RestartPolicyModeFail) diff --git a/client/task_runner.go b/client/task_runner.go index 3f3412d29..4367e8dab 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -69,6 +69,28 @@ type TaskRunner struct { vaultToken string vaultRenewalCh <-chan error + // templateManager is used to manage any consul-templates this task may have + templateManager *TaskTemplateManager + + // templatesRendered mark whether the templates have been rendered + templatesRendered bool + + // unblockCh is used to unblock the starting of the task + unblockCh chan struct{} + unblocked bool + unblockLock sync.Mutex + + // restartCh is used to restart a task + restartCh chan *structs.TaskEvent + + // signalCh is used to send a signal to a task + signalCh chan SignalEvent + + // killCh is used to kill a task + killCh chan *structs.TaskEvent + killed bool + killLock sync.Mutex + destroy bool destroyCh chan struct{} destroyLock sync.Mutex @@ -85,11 +107,18 @@ type taskRunnerState struct { Task *structs.Task HandleID string ArtifactDownloaded bool + TemplatesRendered bool } // TaskStateUpdater is used to signal that tasks state has changed. type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent) +// SignalEvent is a tuple of the signal and the event generating it +type SignalEvent struct { + s os.Signal + e *structs.TaskEvent +} + // NewTaskRunner is used to create a new task context func NewTaskRunner(logger *log.Logger, config *config.Config, updater TaskStateUpdater, ctx *driver.ExecContext, @@ -117,6 +146,10 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, updateCh: make(chan *structs.Allocation, 64), destroyCh: make(chan struct{}), waitCh: make(chan struct{}), + unblockCh: make(chan struct{}), + restartCh: make(chan *structs.TaskEvent), + signalCh: make(chan SignalEvent), + killCh: make(chan *structs.TaskEvent), } return tc @@ -167,6 +200,7 @@ func (r *TaskRunner) RestoreState() error { r.task = snap.Task } r.artifactsDownloaded = snap.ArtifactDownloaded + r.templatesRendered = snap.TemplatesRendered if err := r.setTaskEnv(); err != nil { return fmt.Errorf("client: failed to create task environment for task %q in allocation %q: %v", @@ -208,6 +242,7 @@ func (r *TaskRunner) SaveState() error { Task: r.task, Version: r.config.Version, ArtifactDownloaded: r.artifactsDownloaded, + TemplatesRendered: r.templatesRendered, } r.handleLock.Lock() if r.handle != nil { @@ -315,24 +350,23 @@ func (r *TaskRunner) validateTask() error { return mErr.ErrorOrNil() } -func (r *TaskRunner) run() { - // Predeclare things so we can jump to the RESTART - var handleEmpty bool - var stopCollection chan struct{} +// prestart handles life-cycle tasks that occur before the task has started. +func (r *TaskRunner) prestart(taskDir string) (success bool) { + // Build the template manager + var err error + r.templateManager, err = NewTaskTemplateManager(r, r.task.Templates, r.templatesRendered, + r.config, r.vaultToken, taskDir, r.taskEnv) + if err != nil { + err := fmt.Errorf("failed to build task's template manager: %v", err) + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err)) + r.logger.Printf("[ERR] client: alloc %q, task %q %v", r.alloc.ID, r.task.Name, err) + return + } for { // Download the task's artifacts if !r.artifactsDownloaded && len(r.task.Artifacts) > 0 { r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts)) - taskDir, ok := r.ctx.AllocDir.TaskDirs[r.task.Name] - if !ok { - err := fmt.Errorf("task directory couldn't be found") - r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err)) - r.logger.Printf("[ERR] client: task directory for alloc %q task %q couldn't be found", r.alloc.ID, r.task.Name) - r.restartTracker.SetStartError(err) - goto RESTART - } - for _, artifact := range r.task.Artifacts { if err := getter.GetArtifact(r.taskEnv, artifact, taskDir); err != nil { r.setState(structs.TaskStatePending, @@ -345,6 +379,67 @@ func (r *TaskRunner) run() { r.artifactsDownloaded = true } + // We don't have to wait + if r.templatesRendered { + return true + } + + // Block for consul-template + select { + case <-r.unblockCh: + r.templatesRendered = true + return true + case event := <-r.killCh: + r.setState(structs.TaskStateDead, event) + r.logger.Printf("[ERR] client: task killed: %v", event) + return false + case update := <-r.updateCh: + if err := r.handleUpdate(update); err != nil { + r.logger.Printf("[ERR] client: update to task %q failed: %v", r.task.Name, err) + } + case err := <-r.vaultRenewalCh: + if err == nil { + continue // Only handle once. + } + + // This is a fatal error as the task is not valid if it requested a + // Vault token and the token has now expired. + r.logger.Printf("[WARN] client: vault token for task %q not renewed: %v", r.task.Name, err) + r.Destroy(structs.NewTaskEvent(structs.TaskVaultRenewalFailed).SetVaultRenewalError(err)) + + case <-r.destroyCh: + r.setState(structs.TaskStateDead, r.destroyEvent) + return false + } + + RESTART: + restart := r.shouldRestart() + if !restart { + return false + } + } +} + +func (r *TaskRunner) run() { + // Predeclare things so we can jump to the RESTART + var handleEmpty bool + var stopCollection chan struct{} + + // Get the task directory + taskDir, ok := r.ctx.AllocDir.TaskDirs[r.task.Name] + if !ok { + err := fmt.Errorf("task directory couldn't be found") + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(err)) + r.logger.Printf("[ERR] client: task directory for alloc %q task %q couldn't be found", r.alloc.ID, r.task.Name) + return + } + + // Do all prestart events first + if success := r.prestart(taskDir); !success { + return + } + + for { // Start the task if not yet started or it is being forced. This logic // is necessary because in the case of a restore the handle already // exists. @@ -413,75 +508,41 @@ func (r *TaskRunner) run() { r.logger.Printf("[WARN] client: vault token for task %q not renewed: %v", r.task.Name, err) r.Destroy(structs.NewTaskEvent(structs.TaskVaultRenewalFailed).SetVaultRenewalError(err)) + case se := <-r.signalCh: + r.logger.Printf("[DEBUG] client: task being signalled with %v: %s", se.s, se.e.TaskSignalReason) + r.setState(structs.TaskStateRunning, se.e) + + // TODO need an interface on the driver + + case event := <-r.restartCh: + r.logger.Printf("[DEBUG] client: task being restarted: %s", event.RestartReason) + r.setState(structs.TaskStateRunning, event) + r.killTask(event.RestartReason, stopCollection) + + // Since the restart isn't from a failure, restart immediately + // and don't count against the restart policy + r.restartTracker.SetRestartTriggered() + break WAIT + + case event := <-r.killCh: + r.logger.Printf("[ERR] client: task being killed: %s", event.KillReason) + r.killTask(event.KillReason, stopCollection) + return + case <-r.destroyCh: // Store the task event that provides context on the task destroy. if r.destroyEvent.Type != structs.TaskKilled { r.setState(structs.TaskStateRunning, r.destroyEvent) } - // Mark that we received the kill event - timeout := driver.GetKillTimeout(r.task.KillTimeout, r.config.MaxKillTimeout) - r.setState(structs.TaskStateRunning, - structs.NewTaskEvent(structs.TaskKilling).SetKillTimeout(timeout)) - - // Kill the task using an exponential backoff in-case of failures. - destroySuccess, err := r.handleDestroy() - if !destroySuccess { - // We couldn't successfully destroy the resource created. - r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err) - } - - // Stop collection of the task's resource usage - close(stopCollection) - - // Store that the task has been destroyed and any associated error. - r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(err)) - - r.runningLock.Lock() - r.running = false - r.runningLock.Unlock() - + r.killTask("", stopCollection) return } } RESTART: - state, when := r.restartTracker.GetState() - r.restartTracker.SetStartError(nil).SetWaitResult(nil) - reason := r.restartTracker.GetReason() - switch state { - case structs.TaskNotRestarting, structs.TaskTerminated: - r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID) - if state == structs.TaskNotRestarting { - r.setState(structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskNotRestarting). - SetRestartReason(reason)) - } - return - case structs.TaskRestarting: - r.logger.Printf("[INFO] client: Restarting task %q for alloc %q in %v", r.task.Name, r.alloc.ID, when) - r.setState(structs.TaskStatePending, - structs.NewTaskEvent(structs.TaskRestarting). - SetRestartDelay(when). - SetRestartReason(reason)) - default: - r.logger.Printf("[ERR] client: restart tracker returned unknown state: %q", state) - return - } - - // Sleep but watch for destroy events. - select { - case <-time.After(when): - case <-r.destroyCh: - } - - // Destroyed while we were waiting to restart, so abort. - r.destroyLock.Lock() - destroyed := r.destroy - r.destroyLock.Unlock() - if destroyed { - r.logger.Printf("[DEBUG] client: Not restarting task: %v because it has been destroyed due to: %s", r.task.Name, r.destroyEvent.Message) - r.setState(structs.TaskStateDead, r.destroyEvent) + restart := r.shouldRestart() + if !restart { return } @@ -493,6 +554,85 @@ func (r *TaskRunner) run() { } } +// shouldRestart returns if the task should restart. If the return value is +// true, the task's restart policy has already been considered and any wait time +// between restarts has been applied. +func (r *TaskRunner) shouldRestart() bool { + state, when := r.restartTracker.GetState() + reason := r.restartTracker.GetReason() + switch state { + case structs.TaskNotRestarting, structs.TaskTerminated: + r.logger.Printf("[INFO] client: Not restarting task: %v for alloc: %v ", r.task.Name, r.alloc.ID) + if state == structs.TaskNotRestarting { + r.setState(structs.TaskStateDead, + structs.NewTaskEvent(structs.TaskNotRestarting). + SetRestartReason(reason)) + } + return false + case structs.TaskRestarting: + r.logger.Printf("[INFO] client: Restarting task %q for alloc %q in %v", r.task.Name, r.alloc.ID, when) + r.setState(structs.TaskStatePending, + structs.NewTaskEvent(structs.TaskRestarting). + SetRestartDelay(when). + SetRestartReason(reason)) + default: + r.logger.Printf("[ERR] client: restart tracker returned unknown state: %q", state) + return false + } + + // Sleep but watch for destroy events. + select { + case <-time.After(when): + case <-r.destroyCh: + } + + // Destroyed while we were waiting to restart, so abort. + r.destroyLock.Lock() + destroyed := r.destroy + r.destroyLock.Unlock() + if destroyed { + r.logger.Printf("[DEBUG] client: Not restarting task: %v because it has been destroyed due to: %s", r.task.Name, r.destroyEvent.Message) + r.setState(structs.TaskStateDead, r.destroyEvent) + return false + } + + return true +} + +// killTask kills the running task, storing the reason in the Killing TaskEvent. +// The associated stats collection channel is also closed once the task is +// successfully killed. +func (r *TaskRunner) killTask(reason string, statsCh chan struct{}) { + r.runningLock.Lock() + running := r.running + r.runningLock.Unlock() + if !running { + return + } + + // Mark that we received the kill event + timeout := driver.GetKillTimeout(r.task.KillTimeout, r.config.MaxKillTimeout) + r.setState(structs.TaskStateRunning, + structs.NewTaskEvent(structs.TaskKilling).SetKillTimeout(timeout).SetKillReason(reason)) + + // Kill the task using an exponential backoff in-case of failures. + destroySuccess, err := r.handleDestroy() + if !destroySuccess { + // We couldn't successfully destroy the resource created. + r.logger.Printf("[ERR] client: failed to kill task %q. Resources may have been leaked: %v", r.task.Name, err) + } + + r.runningLock.Lock() + r.running = false + r.runningLock.Unlock() + + // Stop collection of the task's resource usage + close(statsCh) + + // Store that the task has been destroyed and any associated error. + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(err)) +} + // startTask creates the driver and starts the task. func (r *TaskRunner) startTask() error { // Create a driver @@ -637,6 +777,88 @@ func (r *TaskRunner) handleDestroy() (destroyed bool, err error) { return } +// Restart will restart the task +func (r *TaskRunner) Restart(source, reason string) { + + reasonStr := fmt.Sprintf("%s: %s", source, reason) + event := structs.NewTaskEvent(structs.TaskRestartSignal).SetRestartReason(reasonStr) + + r.logger.Printf("[DEBUG] client: restarting task %v for alloc %q: %v", + r.task.Name, r.alloc.ID, reasonStr) + + r.runningLock.Lock() + running := r.running + r.runningLock.Unlock() + + // Drop the restart event + if !running { + r.logger.Printf("[DEBUG] client: skipping restart since task isn't running") + return + } + + select { + case r.restartCh <- event: + case <-r.waitCh: + } +} + +// Signal will send a signal to the task +func (r *TaskRunner) Signal(source, reason string, s os.Signal) { + + reasonStr := fmt.Sprintf("%s: %s", source, reason) + event := structs.NewTaskEvent(structs.TaskSignaling).SetTaskSignal(s).SetTaskSignalReason(reasonStr) + + r.logger.Printf("[DEBUG] client: sending signal %v to task %v for alloc %q", s, r.task.Name, r.alloc.ID) + + r.runningLock.Lock() + running := r.running + r.runningLock.Unlock() + + // Drop the restart event + if !running { + r.logger.Printf("[DEBUG] client: skipping signal since task isn't running") + return + } + + select { + case r.signalCh <- SignalEvent{s: s, e: event}: + case <-r.waitCh: + } +} + +// Kill will kill a task and store the error, no longer restarting the task +func (r *TaskRunner) Kill(source, reason string) { + r.killLock.Lock() + defer r.killLock.Unlock() + if r.killed { + return + } + + reasonStr := fmt.Sprintf("%s: %s", source, reason) + event := structs.NewTaskEvent(structs.TaskKilling).SetKillReason(reasonStr) + + r.logger.Printf("[DEBUG] client: killing task %v for alloc %q: %v", r.task.Name, r.alloc.ID, reasonStr) + + select { + case r.killCh <- event: + close(r.killCh) + case <-r.waitCh: + } +} + +// UnblockStart unblocks the starting of the task. It currently assumes only +// consul-template will unblock +func (r *TaskRunner) UnblockStart(source string) { + r.unblockLock.Lock() + defer r.unblockLock.Unlock() + if r.unblocked { + return + } + + r.logger.Printf("[DEBUG] client: unblocking task %v for alloc %q: %v", r.task.Name, r.alloc.ID, source) + close(r.unblockCh) +} + // Helper function for converting a WaitResult into a TaskTerminated event. func (r *TaskRunner) waitErrorToEvent(res *dstructs.WaitResult) *structs.TaskEvent { return structs.NewTaskEvent(structs.TaskTerminated). diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 9b9997f23..29d8f0f2d 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -471,3 +471,127 @@ func TestTaskRunner_VaultTokenRenewal(t *testing.T) { t.Fatalf("Fifth Event was %v; want %v", upd.events[4].Type, structs.TaskKilled) } } + +func TestTaskRunner_RestartTask(t *testing.T) { + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "exit_code": "0", + "run_for": "10s", + } + + upd, tr := testTaskRunnerFromAlloc(true, alloc) + tr.MarkReceived() + go tr.Run() + defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled)) + defer tr.ctx.AllocDir.Destroy() + + go func() { + time.Sleep(100 * time.Millisecond) + tr.Restart("test", "restart") + time.Sleep(100 * time.Millisecond) + tr.Kill("test", "restart") + }() + + select { + case <-tr.WaitCh(): + case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): + t.Fatalf("timeout") + } + + if len(upd.events) != 9 { + t.Fatalf("should have 9 updates: %#v", upd.events) + } + + if upd.state != structs.TaskStateDead { + t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead) + } + + if upd.events[0].Type != structs.TaskReceived { + t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived) + } + + if upd.events[1].Type != structs.TaskStarted { + t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted) + } + + if upd.events[2].Type != structs.TaskRestartSignal { + t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskRestartSignal) + } + + if upd.events[3].Type != structs.TaskKilling { + t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskKilling) + } + + if upd.events[4].Type != structs.TaskKilled { + t.Fatalf("Fifth Event was %v; want %v", upd.events[4].Type, structs.TaskKilled) + } + + t.Logf("%+v", upd.events[5]) + if upd.events[5].Type != structs.TaskRestarting { + t.Fatalf("Sixth Event was %v; want %v", upd.events[5].Type, structs.TaskRestarting) + } + + if upd.events[6].Type != structs.TaskStarted { + t.Fatalf("Seventh Event was %v; want %v", upd.events[7].Type, structs.TaskStarted) + } + if upd.events[7].Type != structs.TaskKilling { + t.Fatalf("Eighth Event was %v; want %v", upd.events[7].Type, structs.TaskKilling) + } + + if upd.events[8].Type != structs.TaskKilled { + t.Fatalf("Nineth Event was %v; want %v", upd.events[8].Type, structs.TaskKilled) + } +} + +func TestTaskRunner_KillTask(t *testing.T) { + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + task.Driver = "mock_driver" + task.Config = map[string]interface{}{ + "exit_code": "0", + "run_for": "10s", + } + + upd, tr := testTaskRunnerFromAlloc(false, alloc) + tr.MarkReceived() + go tr.Run() + defer tr.Destroy(structs.NewTaskEvent(structs.TaskKilled)) + defer tr.ctx.AllocDir.Destroy() + + go func() { + time.Sleep(100 * time.Millisecond) + tr.Kill("test", "kill") + }() + + select { + case <-tr.WaitCh(): + case <-time.After(time.Duration(testutil.TestMultiplier()*15) * time.Second): + t.Fatalf("timeout") + } + + if len(upd.events) != 4 { + t.Fatalf("should have 4 updates: %#v", upd.events) + } + + if upd.state != structs.TaskStateDead { + t.Fatalf("TaskState %v; want %v", upd.state, structs.TaskStateDead) + } + + if upd.events[0].Type != structs.TaskReceived { + t.Fatalf("First Event was %v; want %v", upd.events[0].Type, structs.TaskReceived) + } + + if upd.events[1].Type != structs.TaskStarted { + t.Fatalf("Second Event was %v; want %v", upd.events[1].Type, structs.TaskStarted) + } + + if upd.events[2].Type != structs.TaskKilling { + t.Fatalf("Third Event was %v; want %v", upd.events[2].Type, structs.TaskKilling) + } + + if upd.events[3].Type != structs.TaskKilled { + t.Fatalf("Fourth Event was %v; want %v", upd.events[3].Type, structs.TaskKilled) + } +} diff --git a/command/alloc_status.go b/command/alloc_status.go index 1f5c6bb61..4e2de1b9f 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -353,6 +353,25 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) { } else { desc = "Task's sibling failed" } + case api.TaskSignaling: + sig := event.TaskSignal + reason := event.TaskSignalReason + + if sig == "" && reason == "" { + desc = "Task being sent a signal" + } else if sig == "" { + desc = reason + } else if reason == "" { + desc = fmt.Sprintf("Task being sent signal %v", sig) + } else { + desc = fmt.Sprintf("Task being sent signal %v: %v", sig, reason) + } + case api.TaskRestartSignal: + if event.RestartReason != "" { + desc = event.RestartReason + } else { + desc = "Task signaled to restart" + } } // Reverse order so we are sorted by time diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index d99326bf9..a7bd3cd71 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -3287,6 +3287,12 @@ func TestTaskDiff(t *testing.T) { Old: "", New: "bam3", }, + { + Type: DiffTypeAdded, + Name: "ChangeSignal", + Old: "", + New: "SIGHUP3", + }, { Type: DiffTypeAdded, Name: "DestPath", @@ -3295,22 +3301,10 @@ func TestTaskDiff(t *testing.T) { }, { Type: DiffTypeAdded, - Name: "EmbededTmpl", + Name: "EmbeddedTmpl", Old: "", New: "baz3", }, - { - Type: DiffTypeAdded, - Name: "Once", - Old: "", - New: "true", - }, - { - Type: DiffTypeAdded, - Name: "RestartSignal", - Old: "", - New: "SIGHUP3", - }, { Type: DiffTypeAdded, Name: "SourcePath", @@ -3335,6 +3329,12 @@ func TestTaskDiff(t *testing.T) { Old: "bam2", New: "", }, + { + Type: DiffTypeDeleted, + Name: "ChangeSignal", + Old: "SIGHUP2", + New: "", + }, { Type: DiffTypeDeleted, Name: "DestPath", @@ -3343,22 +3343,10 @@ func TestTaskDiff(t *testing.T) { }, { Type: DiffTypeDeleted, - Name: "EmbededTmpl", + Name: "EmbeddedTmpl", Old: "baz2", New: "", }, - { - Type: DiffTypeDeleted, - Name: "Once", - Old: "false", - New: "", - }, - { - Type: DiffTypeDeleted, - Name: "RestartSignal", - Old: "SIGHUP2", - New: "", - }, { Type: DiffTypeDeleted, Name: "SourcePath", diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 0b53c9ee2..fff8b3e82 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -10,6 +10,7 @@ import ( "errors" "fmt" "io" + "os" "path/filepath" "reflect" "regexp" @@ -2223,7 +2224,7 @@ type Template struct { DestPath string `mapstructure:"destination"` // EmbeddedTmpl store the raw template. This is useful for smaller templates - // where they are embeded in the job file rather than sent as an artificat + // where they are embedded in the job file rather than sent as an artificat EmbeddedTmpl string `mapstructure:"data"` // ChangeMode indicates what should be done if the template is re-rendered @@ -2261,7 +2262,7 @@ func (t *Template) Validate() error { // Verify we have something to render if t.SourcePath == "" && t.EmbeddedTmpl == "" { - multierror.Append(&mErr, fmt.Errorf("Must specify a source path or have an embeded template")) + multierror.Append(&mErr, fmt.Errorf("Must specify a source path or have an embedded template")) } // Verify we can render somewhere @@ -2393,6 +2394,13 @@ const ( // restarted because it has exceeded its restart policy. TaskNotRestarting = "Not Restarting" + // TaskRestartSignal indicates that the task has been signalled to be + // restarted + TaskRestartSignal = "Restart Signaled" + + // TaskSignaling indicates that the task is being signalled. + TaskSignaling = "Signaling" + // TaskDownloadingArtifacts means the task is downloading the artifacts // specified in the task. TaskDownloadingArtifacts = "Downloading Artifacts" @@ -2436,6 +2444,9 @@ type TaskEvent struct { // Task Killed Fields. KillError string // Error killing the task. + // KillReason is the reason the task was killed + KillReason string + // TaskRestarting fields. StartDelay int64 // The sleep period before restarting the task in unix nanoseconds. @@ -2457,6 +2468,12 @@ type TaskEvent struct { // VaultError is the error from token renewal VaultError string + + // TaskSignalReason indicates the reason the task is being signalled. + TaskSignalReason string + + // TaskSignal is the signal that was sent to the task + TaskSignal string } func (te *TaskEvent) GoString() string { @@ -2510,6 +2527,11 @@ func (e *TaskEvent) SetKillError(err error) *TaskEvent { return e } +func (e *TaskEvent) SetKillReason(r string) *TaskEvent { + e.KillReason = r + return e +} + func (e *TaskEvent) SetRestartDelay(delay time.Duration) *TaskEvent { e.StartDelay = int64(delay) return e @@ -2520,6 +2542,16 @@ func (e *TaskEvent) SetRestartReason(reason string) *TaskEvent { return e } +func (e *TaskEvent) SetTaskSignalReason(r string) *TaskEvent { + e.TaskSignalReason = r + return e +} + +func (e *TaskEvent) SetTaskSignal(s os.Signal) *TaskEvent { + e.TaskSignal = s.String() + return e +} + func (e *TaskEvent) SetDownloadError(err error) *TaskEvent { if err != nil { e.DownloadError = err.Error()