Emit generic task events

This commit is contained in:
Alex Dadgar
2017-08-07 21:26:04 -07:00
parent b258b91379
commit 43d2c425d1
8 changed files with 97 additions and 39 deletions

View File

@@ -506,6 +506,7 @@ const (
TaskRestartSignal = "Restart Signaled"
TaskLeaderDead = "Leader Task Dead"
TaskBuildingTaskDir = "Building Task Directory"
TaskGenericMessage = "Generic"
)
// TaskEvent is an event that effects the state of a task and contains meta-data
@@ -533,4 +534,5 @@ type TaskEvent struct {
VaultError string
TaskSignalReason string
TaskSignal string
GenericSource string
}

View File

@@ -622,7 +622,7 @@ func (r *AllocRunner) setStatus(status, desc string) {
// setTaskState is used to set the status of a task. If state is empty then the
// event is appended but not synced with the server. The event may be omitted
func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEvent) {
func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEvent, lazySync bool) {
r.taskStatusLock.Lock()
defer r.taskStatusLock.Unlock()
taskState, ok := r.taskStates[taskName]
@@ -643,10 +643,18 @@ func (r *AllocRunner) setTaskState(taskName, state string, event *structs.TaskEv
r.appendTaskEvent(taskState, event)
}
if state == "" {
if lazySync {
return
}
// If the state hasn't been set use the existing state.
if state == "" {
state = taskState.State
if taskState.State == "" {
state = structs.TaskStatePending
}
}
switch state {
case structs.TaskStateRunning:
// Capture the start time if it is just starting

View File

@@ -48,6 +48,9 @@ type TaskHooks interface {
// Kill is used to kill the task because of the passed error. If fail is set
// to true, the task is marked as failed
Kill(source, reason string, fail bool)
// EmitEvent is used to emit an event to be stored in the tasks events.
EmitEvent(source, message string)
}
// TaskTemplateManager is used to run a set of templates for a given task

View File

@@ -43,14 +43,18 @@ type MockTaskHooks struct {
KillReason string
KillCh chan struct{}
Events []string
EmitEventCh chan struct{}
}
func NewMockTaskHooks() *MockTaskHooks {
return &MockTaskHooks{
UnblockCh: make(chan struct{}, 1),
RestartCh: make(chan struct{}, 1),
SignalCh: make(chan struct{}, 1),
KillCh: make(chan struct{}, 1),
UnblockCh: make(chan struct{}, 1),
RestartCh: make(chan struct{}, 1),
SignalCh: make(chan struct{}, 1),
KillCh: make(chan struct{}, 1),
EmitEventCh: make(chan struct{}, 1),
}
}
func (m *MockTaskHooks) Restart(source, reason string) {
@@ -87,6 +91,14 @@ func (m *MockTaskHooks) UnblockStart(source string) {
m.Unblocked = true
}
func (m *MockTaskHooks) EmitEvent(source, message string) {
m.Events = append(m.Events, message)
select {
case m.EmitEventCh <- struct{}{}:
default:
}
}
// testHarness is used to test the TaskTemplateManager by spinning up
// Consul/Vault as needed
type testHarness struct {

View File

@@ -189,7 +189,7 @@ func (s *taskRunnerState) Hash() []byte {
}
// TaskStateUpdater is used to signal that tasks state has changed.
type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent)
type TaskStateUpdater func(taskName, state string, event *structs.TaskEvent, lazySync bool)
// SignalEvent is a tuple of the signal and the event generating it
type SignalEvent struct {
@@ -251,7 +251,7 @@ func NewTaskRunner(logger *log.Logger, config *config.Config,
// MarkReceived marks the task as received.
func (r *TaskRunner) MarkReceived() {
r.updater(r.task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived))
r.updater(r.task.Name, structs.TaskStatePending, structs.NewTaskEvent(structs.TaskReceived), false)
}
// WaitCh returns a channel to wait for termination
@@ -498,14 +498,14 @@ func (r *TaskRunner) DestroyState() error {
}
// setState is used to update the state of the task runner
func (r *TaskRunner) setState(state string, event *structs.TaskEvent) {
func (r *TaskRunner) setState(state string, event *structs.TaskEvent, lazySync bool) {
// Persist our state to disk.
if err := r.SaveState(); err != nil {
r.logger.Printf("[ERR] client: failed to save state of Task Runner for task %q: %v", r.task.Name, err)
}
// Indicate the task has been updated.
r.updater(r.task.Name, state, event)
r.updater(r.task.Name, state, event, lazySync)
}
// createDriver makes a driver for the task
@@ -515,7 +515,7 @@ func (r *TaskRunner) createDriver() (driver.Driver, error) {
eventEmitter := func(m string, args ...interface{}) {
msg := fmt.Sprintf(m, args...)
r.logger.Printf("[DEBUG] client: driver event for alloc %q: %s", r.alloc.ID, msg)
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg))
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDriverMessage).SetDriverMessage(msg), false)
}
driverCtx := driver.NewDriverContext(r.task.Name, r.alloc.ID, r.config, r.config.Node, r.logger, eventEmitter)
@@ -537,7 +537,8 @@ func (r *TaskRunner) Run() {
if err := r.validateTask(); err != nil {
r.setState(
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskFailedValidation).SetValidationError(err).SetFailsTask())
structs.NewTaskEvent(structs.TaskFailedValidation).SetValidationError(err).SetFailsTask(),
false)
return
}
@@ -549,7 +550,8 @@ func (r *TaskRunner) Run() {
e := fmt.Errorf("failed to create driver of task %q for alloc %q: %v", r.task.Name, r.alloc.ID, err)
r.setState(
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(e).SetFailsTask())
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(e).SetFailsTask(),
false)
return
}
@@ -560,7 +562,8 @@ func (r *TaskRunner) Run() {
e := fmt.Errorf("failed to build task directory for %q: %v", r.task.Name, err)
r.setState(
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(e).SetFailsTask())
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(e).SetFailsTask(),
false)
return
}
@@ -858,7 +861,9 @@ func (r *TaskRunner) updatedTokenHandler() {
r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder)
if err != nil {
err := fmt.Errorf("failed to build task's template manager: %v", err)
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
r.setState(structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(),
false)
r.logger.Printf("[ERR] client: alloc %q, task %q %v", r.alloc.ID, r.task.Name, err)
r.Kill("vault", err.Error(), true)
return
@@ -893,7 +898,8 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res
if err != nil {
r.setState(
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(),
false)
resultCh <- false
return
}
@@ -901,7 +907,8 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res
if err := os.MkdirAll(filepath.Dir(renderTo), 07777); err != nil {
r.setState(
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(),
false)
resultCh <- false
return
}
@@ -909,7 +916,8 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res
if err := ioutil.WriteFile(renderTo, decoded, 0777); err != nil {
r.setState(
structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(),
false)
resultCh <- false
return
}
@@ -924,14 +932,14 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res
// Download the task's artifacts
if !downloaded && len(task.Artifacts) > 0 {
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts))
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskDownloadingArtifacts), false)
taskEnv := r.envBuilder.Build()
for _, artifact := range task.Artifacts {
if err := getter.GetArtifact(taskEnv, artifact, r.taskDir.Dir); err != nil {
wrapped := fmt.Errorf("failed to download artifact %q: %v", artifact.GetterSource, err)
r.logger.Printf("[DEBUG] client: %v", wrapped)
r.setState(structs.TaskStatePending,
structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped))
structs.NewTaskEvent(structs.TaskArtifactDownloadFailed).SetDownloadError(wrapped), false)
r.restartTracker.SetStartError(structs.WrapRecoverable(wrapped.Error(), err))
goto RESTART
}
@@ -961,7 +969,7 @@ func (r *TaskRunner) prestart(alloc *structs.Allocation, task *structs.Task, res
r.config, r.vaultFuture.Get(), r.taskDir.Dir, r.envBuilder)
if err != nil {
err := fmt.Errorf("failed to build task's template manager: %v", err)
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask())
r.setState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskSetupFailure).SetSetupError(err).SetFailsTask(), false)
r.logger.Printf("[ERR] client: alloc %q, task %q %v", alloc.ID, task.Name, err)
resultCh <- false
return
@@ -1032,7 +1040,7 @@ func (r *TaskRunner) run() {
case success := <-prestartResultCh:
if !success {
r.cleanup()
r.setState(structs.TaskStateDead, nil)
r.setState(structs.TaskStateDead, nil, false)
return
}
case <-r.startCh:
@@ -1044,12 +1052,12 @@ func (r *TaskRunner) run() {
startErr := r.startTask()
r.restartTracker.SetStartError(startErr)
if startErr != nil {
r.setState("", structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(startErr))
r.setState("", structs.NewTaskEvent(structs.TaskDriverFailure).SetDriverError(startErr), true)
goto RESTART
}
// Mark the task as started
r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
r.setState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted), false)
r.runningLock.Lock()
r.running = true
r.runningLock.Unlock()
@@ -1076,7 +1084,7 @@ func (r *TaskRunner) run() {
// Log whether the task was successful or not.
r.restartTracker.SetWaitResult(waitRes)
r.setState("", r.waitErrorToEvent(waitRes))
r.setState("", r.waitErrorToEvent(waitRes), true)
if !waitRes.Successful() {
r.logger.Printf("[INFO] client: task %q for alloc %q failed: %v", r.task.Name, r.alloc.ID, waitRes)
} else {
@@ -1102,7 +1110,7 @@ func (r *TaskRunner) run() {
}
r.logger.Printf("[DEBUG] client: sending %s", common)
r.setState(structs.TaskStateRunning, se.e)
r.setState(structs.TaskStateRunning, se.e, false)
res := r.handle.Signal(se.s)
se.result <- res
@@ -1118,7 +1126,7 @@ func (r *TaskRunner) run() {
}
r.logger.Printf("[DEBUG] client: restarting %s: %v", common, event.RestartReason)
r.setState(structs.TaskStateRunning, event)
r.setState(structs.TaskStateRunning, event, false)
r.killTask(nil)
close(stopCollection)
@@ -1138,7 +1146,7 @@ func (r *TaskRunner) run() {
r.runningLock.Unlock()
if !running {
r.cleanup()
r.setState(structs.TaskStateDead, r.destroyEvent)
r.setState(structs.TaskStateDead, r.destroyEvent, false)
return
}
@@ -1155,7 +1163,7 @@ func (r *TaskRunner) run() {
if r.destroyEvent.Type == structs.TaskKilling {
killEvent = r.destroyEvent
} else {
r.setState(structs.TaskStateRunning, r.destroyEvent)
r.setState(structs.TaskStateRunning, r.destroyEvent, false)
}
}
@@ -1166,7 +1174,7 @@ func (r *TaskRunner) run() {
<-handleWaitCh
r.cleanup()
r.setState(structs.TaskStateDead, nil)
r.setState(structs.TaskStateDead, nil, false)
return
}
}
@@ -1176,7 +1184,7 @@ func (r *TaskRunner) run() {
restart := r.shouldRestart()
if !restart {
r.cleanup()
r.setState(structs.TaskStateDead, nil)
r.setState(structs.TaskStateDead, nil, false)
return
}
@@ -1240,7 +1248,8 @@ func (r *TaskRunner) shouldRestart() bool {
if state == structs.TaskNotRestarting {
r.setState(structs.TaskStateDead,
structs.NewTaskEvent(structs.TaskNotRestarting).
SetRestartReason(reason).SetFailsTask())
SetRestartReason(reason).SetFailsTask(),
false)
}
return false
case structs.TaskRestarting:
@@ -1248,7 +1257,8 @@ func (r *TaskRunner) shouldRestart() bool {
r.setState(structs.TaskStatePending,
structs.NewTaskEvent(structs.TaskRestarting).
SetRestartDelay(when).
SetRestartReason(reason))
SetRestartReason(reason),
false)
default:
r.logger.Printf("[ERR] client: restart tracker returned unknown state: %q", state)
return false
@@ -1270,7 +1280,7 @@ func (r *TaskRunner) shouldRestart() bool {
r.destroyLock.Unlock()
if destroyed {
r.logger.Printf("[DEBUG] client: Not restarting task: %v because it has been destroyed", r.task.Name)
r.setState(structs.TaskStateDead, r.destroyEvent)
r.setState(structs.TaskStateDead, r.destroyEvent, false)
return false
}
@@ -1302,7 +1312,7 @@ func (r *TaskRunner) killTask(killingEvent *structs.TaskEvent) {
event.SetKillTimeout(timeout)
// Mark that we received the kill event
r.setState(structs.TaskStateRunning, event)
r.setState(structs.TaskStateRunning, event, false)
handle := r.getHandle()
@@ -1318,7 +1328,7 @@ func (r *TaskRunner) killTask(killingEvent *structs.TaskEvent) {
r.runningLock.Unlock()
// Store that the task has been destroyed and any associated error.
r.setState("", structs.NewTaskEvent(structs.TaskKilled).SetKillError(err))
r.setState("", structs.NewTaskEvent(structs.TaskKilled).SetKillError(err), true)
}
// startTask creates the driver, task dir, and starts the task.
@@ -1436,8 +1446,9 @@ func (r *TaskRunner) buildTaskDir(fsi cstructs.FSIsolation) error {
// and the task dir is already built. The reason we call Build again is to
// ensure that the task dir invariants are still held.
if !built {
r.setState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskSetup).
SetMessage(structs.TaskBuildingTaskDir))
r.setState(structs.TaskStatePending,
structs.NewTaskEvent(structs.TaskSetup).SetMessage(structs.TaskBuildingTaskDir),
false)
}
chroot := config.DefaultChrootEnv
@@ -1662,6 +1673,14 @@ func (r *TaskRunner) Kill(source, reason string, fail bool) {
r.Destroy(event)
}
func (r *TaskRunner) EmitEvent(source, message string) {
event := structs.NewTaskEvent(structs.TaskGenericMessage).
SetGenericSource(source).SetMessage(message)
r.setState("", event, false)
r.logger.Printf("[DEBUG] client: event from %q for task %q in alloc %q: %v",
source, r.task.Name, r.alloc.ID, message)
}
// UnblockStart unblocks the starting of the task. It currently assumes only
// consul-template will unblock
func (r *TaskRunner) UnblockStart(source string) {

View File

@@ -41,7 +41,7 @@ type MockTaskStateUpdater struct {
events []*structs.TaskEvent
}
func (m *MockTaskStateUpdater) Update(name, state string, event *structs.TaskEvent) {
func (m *MockTaskStateUpdater) Update(name, state string, event *structs.TaskEvent, _ bool) {
if state != "" {
m.state = state
}

View File

@@ -398,6 +398,9 @@ func (c *AllocStatusCommand) outputTaskStatus(state *api.TaskState) {
desc = event.DriverMessage
case api.TaskLeaderDead:
desc = "Leader Task in Group dead"
case api.TaskGenericMessage:
event.Type = event.GenericSource
desc = event.Message
}
// Reverse order so we are sorted by time

View File

@@ -3539,6 +3539,9 @@ const (
// TaskLeaderDead indicates that the leader task within the has finished.
TaskLeaderDead = "Leader Task Dead"
// TaskGenericMessage is used by various subsystems to emit a message.
TaskGenericMessage = "Generic"
)
// TaskEvent is an event that effects the state of a task and contains meta-data
@@ -3600,6 +3603,9 @@ type TaskEvent struct {
// DriverMessage indicates a driver action being taken.
DriverMessage string
// GenericSource is the source of a message.
GenericSource string
}
func (te *TaskEvent) GoString() string {
@@ -3739,6 +3745,11 @@ func (e *TaskEvent) SetDriverMessage(m string) *TaskEvent {
return e
}
func (e *TaskEvent) SetGenericSource(s string) *TaskEvent {
e.GenericSource = s
return e
}
// TaskArtifact is an artifact to download before running the task.
type TaskArtifact struct {
// GetterSource is the source to download an artifact using go-getter