diff --git a/api/tasks.go b/api/tasks.go index 79ddbe685..dfed4e6e6 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -506,6 +506,7 @@ const ( TaskRestartSignal = "Restart Signaled" TaskLeaderDead = "Leader Task Dead" TaskBuildingTaskDir = "Building Task Directory" + TaskGenericMessage = "Generic" ) // TaskEvent is an event that effects the state of a task and contains meta-data @@ -533,4 +534,5 @@ type TaskEvent struct { VaultError string TaskSignalReason string TaskSignal string + GenericSource string } diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 3171b9f20..6fc8b4076 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -622,7 +622,7 @@ func (r *AllocRunner) setStatus(status, desc string) { // setTaskState is used to set the status of a task. If state is empty then the // event is appended but not synced with the server. The event may be omitted -func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEvent) { +func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEvent, lazySync bool) { r.taskStatusLock.Lock() defer r.taskStatusLock.Unlock() taskState, ok := r.taskStates[taskName] @@ -643,10 +643,18 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv r.appendTaskEvent(taskState, event) } - if state == "" { + if lazySync { return } + // If the state hasn't been set use the existing state. + if state == "" { + state = taskState.State + if taskState.State == "" { + state = structs.TaskStatePending + } + } + switch state { case structs.TaskStateRunning: // Capture the start time if it is just starting diff --git a/client/consul_template.go b/client/consul_template.go index 2a887c877..751b308fd 100644 --- a/client/consul_template.go +++ b/client/consul_template.go @@ -48,6 +48,9 @@ type TaskHooks interface { // Kill is used to kill the task because of the passed error. If fail is set // to true, the task is marked as failed Kill(source, reason string, fail bool) + + // EmitEvent is used to emit an event to be stored in the tasks events. + EmitEvent(source, message string) } // TaskTemplateManager is used to run a set of templates for a given task diff --git a/client/consul_template_test.go b/client/consul_template_test.go index d3ef17b73..d89385802 100644 --- a/client/consul_template_test.go +++ b/client/consul_template_test.go @@ -43,14 +43,18 @@ type MockTaskHooks struct { KillReason string KillCh chan struct{} + + Events []string + EmitEventCh chan struct{} } func NewMockTaskHooks() *MockTaskHooks { return &MockTaskHooks{ - UnblockCh: make(chan struct{}, 1), - RestartCh: make(chan struct{}, 1), - SignalCh: make(chan struct{}, 1), - KillCh: make(chan struct{}, 1), + UnblockCh: make(chan struct{}, 1), + RestartCh: make(chan struct{}, 1), + SignalCh: make(chan struct{}, 1), + KillCh: make(chan struct{}, 1), + EmitEventCh: make(chan struct{}, 1), } } func (m *MockTaskHooks) Restart(source, reason string) { @@ -87,6 +91,14 @@ func (m *MockTaskHooks) UnblockStart(source string) { m.Unblocked = true } +func (m *MockTaskHooks) EmitEvent(source, message string) { + m.Events = append(m.Events, message) + select { + case m.EmitEventCh <- struct{}{}: + default: + } +} + // testHarness is used to test the TaskTemplateManager by spinning up // Consul/Vault as needed type testHarness struct { diff --git a/client/task_runner.go b/client/task_runner.go index 4e0ca6abd..97367e543 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -189,7 +189,7 @@ func (s *taskRunnerState) Hash() []byte { } // TaskStateUpdater is used to signal that tasks state has changed. -type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent) +type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent, lazySync bool) // SignalEvent is a tuple of the signal and the event generating it type SignalEvent struct { @@ -251,7 +251,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config, // MarkReceived marks the task as received. func (r *TaskRunner) MarkReceived() { - r.updater(r.task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived)) + r.updater(r.task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived), false) } // WaitCh returns a channel to wait for termination @@ -498,14 +498,14 @@ func (r *TaskRunner) DestroyState() error { } // setState is used to update the state of the task runner -func (r *TaskRunner) setState(state string, event *structs.TaskEvent) { +func (r *TaskRunner) setState(state string, event *structs.TaskEvent, lazySync bool) { // Persist our state to disk. if err := r.SaveState(); err != nil { r.logger.Printf("[ERR] client: failed to save state of Task Runner for task %q: %v", r.task.Name, err) } // Indicate the task has been updated. - r.updater(r.task.Name, state, event) + r.updater(r.task.Name, state, event, lazySync) } // createDriver makes a driver for the task @@ -515,7 +515,7 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) { eventEmitter := func(m string, args ...interface{}) { msg := fmt.Sprintf(m, args...) r.logger.Printf("[DEBUG] client: driver event for alloc %q: %s", r.alloc.ID, msg) - r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg)) + r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg), false) } driverCtx := driver.NewDriverContext(r.task.Name, r.alloc.ID, r.config, r.config.Node, r.logger, eventEmitter) @@ -537,7 +537,8 @@ func (r *TaskRunner) Run() { if err := r.validateTask(); err != nil { r.setState( structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskFailedValidation).SetValidationError(err).SetFailsTask()) + structs.NewTaskEvent(structs.TaskFailedValidation).SetValidationError(err).SetFailsTask(), + false) return } @@ -549,7 +550,8 @@ func (r *TaskRunner) Run() { e := fmt.Errorf("failed to create driver of task %q for alloc %q: %v", r.task.Name, r.alloc.ID, err) r.setState( structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(e).SetFailsTask()) + structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(e).SetFailsTask(), + false) return } @@ -560,7 +562,8 @@ func (r *TaskRunner) Run() { e := fmt.Errorf("failed to build task directory for %q: %v", r.task.Name, err) r.setState( structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(e).SetFailsTask()) + structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(e).SetFailsTask(), + false) return } @@ -858,7 +861,9 @@ func (r *TaskRunner) updatedTokenHandler() { r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder) if err != nil { err := fmt.Errorf("failed to build task's template manager: %v", err) - r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask()) + r.setState(structs.TaskStateDead, + structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(), + false) r.logger.Printf("[ERR] client: alloc %q, task %q %v", r.alloc.ID, r.task.Name, err) r.Kill("vault", err.Error(), true) return @@ -893,7 +898,8 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res if err != nil { r.setState( structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask()) + structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(), + false) resultCh <- false return } @@ -901,7 +907,8 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res if err := os.MkdirAll(filepath.Dir(renderTo), 07777); err != nil { r.setState( structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask()) + structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(), + false) resultCh <- false return } @@ -909,7 +916,8 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res if err := ioutil.WriteFile(renderTo, decoded, 0777); err != nil { r.setState( structs.TaskStateDead, - structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask()) + structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(), + false) resultCh <- false return } @@ -924,14 +932,14 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res // Download the task's artifacts if !downloaded && len(task.Artifacts) > 0 { - r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts)) + r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts), false) taskEnv := r.envBuilder.Build() for _, artifact := range task.Artifacts { if err := getter.GetArtifact(taskEnv, artifact, r.taskDir.Dir); err != nil { wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err) r.logger.Printf("[DEBUG] client: %v", wrapped) r.setState(structs.TaskStatePending, - structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped)) + structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped), false) r.restartTracker.SetStartError(structs.WrapRecoverable(wrapped.Error(), err)) goto RESTART } @@ -961,7 +969,7 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder) if err != nil { err := fmt.Errorf("failed to build task's template manager: %v", err) - r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask()) + r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(), false) r.logger.Printf("[ERR] client: alloc %q, task %q %v", alloc.ID, task.Name, err) resultCh <- false return @@ -1032,7 +1040,7 @@ func (r *TaskRunner) run() { case success := <-prestartResultCh: if !success { r.cleanup() - r.setState(structs.TaskStateDead, nil) + r.setState(structs.TaskStateDead, nil, false) return } case <-r.startCh: @@ -1044,12 +1052,12 @@ func (r *TaskRunner) run() { startErr := r.startTask() r.restartTracker.SetStartError(startErr) if startErr != nil { - r.setState("", structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(startErr)) + r.setState("", structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(startErr), true) goto RESTART } // Mark the task as started - r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) + r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted), false) r.runningLock.Lock() r.running = true r.runningLock.Unlock() @@ -1076,7 +1084,7 @@ func (r *TaskRunner) run() { // Log whether the task was successful or not. r.restartTracker.SetWaitResult(waitRes) - r.setState("", r.waitErrorToEvent(waitRes)) + r.setState("", r.waitErrorToEvent(waitRes), true) if !waitRes.Successful() { r.logger.Printf("[INFO] client: task %q for alloc %q failed: %v", r.task.Name, r.alloc.ID, waitRes) } else { @@ -1102,7 +1110,7 @@ func (r *TaskRunner) run() { } r.logger.Printf("[DEBUG] client: sending %s", common) - r.setState(structs.TaskStateRunning, se.e) + r.setState(structs.TaskStateRunning, se.e, false) res := r.handle.Signal(se.s) se.result <- res @@ -1118,7 +1126,7 @@ func (r *TaskRunner) run() { } r.logger.Printf("[DEBUG] client: restarting %s: %v", common, event.RestartReason) - r.setState(structs.TaskStateRunning, event) + r.setState(structs.TaskStateRunning, event, false) r.killTask(nil) close(stopCollection) @@ -1138,7 +1146,7 @@ func (r *TaskRunner) run() { r.runningLock.Unlock() if !running { r.cleanup() - r.setState(structs.TaskStateDead, r.destroyEvent) + r.setState(structs.TaskStateDead, r.destroyEvent, false) return } @@ -1155,7 +1163,7 @@ func (r *TaskRunner) run() { if r.destroyEvent.Type == structs.TaskKilling { killEvent = r.destroyEvent } else { - r.setState(structs.TaskStateRunning, r.destroyEvent) + r.setState(structs.TaskStateRunning, r.destroyEvent, false) } } @@ -1166,7 +1174,7 @@ func (r *TaskRunner) run() { <-handleWaitCh r.cleanup() - r.setState(structs.TaskStateDead, nil) + r.setState(structs.TaskStateDead, nil, false) return } } @@ -1176,7 +1184,7 @@ func (r *TaskRunner) run() { restart := r.shouldRestart() if !restart { r.cleanup() - r.setState(structs.TaskStateDead, nil) + r.setState(structs.TaskStateDead, nil, false) return } @@ -1240,7 +1248,8 @@ func (r *TaskRunner) shouldRestart() bool { if state == structs.TaskNotRestarting { r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskNotRestarting). - SetRestartReason(reason).SetFailsTask()) + SetRestartReason(reason).SetFailsTask(), + false) } return false case structs.TaskRestarting: @@ -1248,7 +1257,8 @@ func (r *TaskRunner) shouldRestart() bool { r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskRestarting). SetRestartDelay(when). - SetRestartReason(reason)) + SetRestartReason(reason), + false) default: r.logger.Printf("[ERR] client: restart tracker returned unknown state: %q", state) return false @@ -1270,7 +1280,7 @@ func (r *TaskRunner) shouldRestart() bool { r.destroyLock.Unlock() if destroyed { r.logger.Printf("[DEBUG] client: Not restarting task: %v because it has been destroyed", r.task.Name) - r.setState(structs.TaskStateDead, r.destroyEvent) + r.setState(structs.TaskStateDead, r.destroyEvent, false) return false } @@ -1302,7 +1312,7 @@ func (r *TaskRunner) killTask(killingEvent *structs.TaskEvent) { event.SetKillTimeout(timeout) // Mark that we received the kill event - r.setState(structs.TaskStateRunning, event) + r.setState(structs.TaskStateRunning, event, false) handle := r.getHandle() @@ -1318,7 +1328,7 @@ func (r *TaskRunner) killTask(killingEvent *structs.TaskEvent) { r.runningLock.Unlock() // Store that the task has been destroyed and any associated error. - r.setState("", structs.NewTaskEvent(structs.TaskKilled).SetKillError(err)) + r.setState("", structs.NewTaskEvent(structs.TaskKilled).SetKillError(err), true) } // startTask creates the driver, task dir, and starts the task. @@ -1436,8 +1446,9 @@ func (r *TaskRunner) buildTaskDir(fsi cstructs.FSIsolation) error { // and the task dir is already built. The reason we call Build again is to // ensure that the task dir invariants are still held. if !built { - r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskSetup). - SetMessage(structs.TaskBuildingTaskDir)) + r.setState(structs.TaskStatePending, + structs.NewTaskEvent(structs.TaskSetup).SetMessage(structs.TaskBuildingTaskDir), + false) } chroot := config.DefaultChrootEnv @@ -1662,6 +1673,14 @@ func (r *TaskRunner) Kill(source, reason string, fail bool) { r.Destroy(event) } +func (r *TaskRunner) EmitEvent(source, message string) { + event := structs.NewTaskEvent(structs.TaskGenericMessage). + SetGenericSource(source).SetMessage(message) + r.setState("", event, false) + r.logger.Printf("[DEBUG] client: event from %q for task %q in alloc %q: %v", + source, r.task.Name, r.alloc.ID, message) +} + // UnblockStart unblocks the starting of the task. It currently assumes only // consul-template will unblock func (r *TaskRunner) UnblockStart(source string) { diff --git a/client/task_runner_test.go b/client/task_runner_test.go index 7e5f1151d..788556c81 100644 --- a/client/task_runner_test.go +++ b/client/task_runner_test.go @@ -41,7 +41,7 @@ type MockTaskStateUpdater struct { events []*structs.TaskEvent } -func (m *MockTaskStateUpdater) Update(name, state string, event *structs.TaskEvent) { +func (m *MockTaskStateUpdater) Update(name, state string, event *structs.TaskEvent, _ bool) { if state != "" { m.state = state } diff --git a/command/alloc_status.go b/command/alloc_status.go index 9094ee002..723029698 100644 --- a/command/alloc_status.go +++ b/command/alloc_status.go @@ -398,6 +398,9 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) { desc = event.DriverMessage case api.TaskLeaderDead: desc = "Leader Task in Group dead" + case api.TaskGenericMessage: + event.Type = event.GenericSource + desc = event.Message } // Reverse order so we are sorted by time diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 134a81a4c..fdbcc2b1d 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3539,6 +3539,9 @@ const ( // TaskLeaderDead indicates that the leader task within the has finished. TaskLeaderDead = "Leader Task Dead" + + // TaskGenericMessage is used by various subsystems to emit a message. + TaskGenericMessage = "Generic" ) // TaskEvent is an event that effects the state of a task and contains meta-data @@ -3600,6 +3603,9 @@ type TaskEvent struct { // DriverMessage indicates a driver action being taken. DriverMessage string + + // GenericSource is the source of a message. + GenericSource string } func (te *TaskEvent) GoString() string { @@ -3739,6 +3745,11 @@ func (e *TaskEvent) SetDriverMessage(m string) *TaskEvent { return e } +func (e *TaskEvent) SetGenericSource(s string) *TaskEvent { + e.GenericSource = s + return e +} + // TaskArtifact is an artifact to download before running the task. type TaskArtifact struct { // GetterSource is the source to download an artifact using go-getter