From 5405313da1bb7b00357ba4b84a3eacc2decc6c60 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Mon, 10 Sep 2018 17:34:45 -0700 Subject: [PATCH] tr: refactor EmitEvents into Emit+Append * UpdateState: set state, append event, persist, update servers * EmitEvent: append event, persist, update servers * AppendEvent: append event, persist AppendEvent may not even have to persist, but for the sake of correctness I'm going with that for now. --- client/allocrunnerv2/taskrunner/lifecycle.go | 10 ++-- .../allocrunnerv2/taskrunner/task_runner.go | 48 +++++++++++++------ 2 files changed, 39 insertions(+), 19 deletions(-) diff --git a/client/allocrunnerv2/taskrunner/lifecycle.go b/client/allocrunnerv2/taskrunner/lifecycle.go index 521c0157d..48c477dd0 100644 --- a/client/allocrunnerv2/taskrunner/lifecycle.go +++ b/client/allocrunnerv2/taskrunner/lifecycle.go @@ -16,8 +16,8 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai return ErrTaskNotRunning } - // Emit the event - tr.EmitEvent(event) + // Append the event (it will get emitted by UpdateState) + tr.AppendEvent(event) // Tell the restart tracker that a restart triggered the exit tr.restartTracker.SetRestartTriggered(failure) @@ -66,8 +66,8 @@ func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error return ErrTaskNotRunning } - // Emit the event - tr.EmitEvent(event) + // Append the event (it will get emitted by UpdateState) + tr.AppendEvent(event) // Run the hooks prior to killing the task tr.kill() @@ -89,7 +89,7 @@ func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error } // Store that the task has been destroyed and any associated error. - tr.SetState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr)) + tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr)) if destroyErr != nil { return destroyErr diff --git a/client/allocrunnerv2/taskrunner/task_runner.go b/client/allocrunnerv2/taskrunner/task_runner.go index 6593be3e4..3118626ef 100644 --- a/client/allocrunnerv2/taskrunner/task_runner.go +++ b/client/allocrunnerv2/taskrunner/task_runner.go @@ -362,12 +362,12 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) { case structs.TaskNotRestarting, structs.TaskTerminated: tr.logger.Info("not restarting task", "reason", reason) if state == structs.TaskNotRestarting { - tr.SetState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskNotRestarting).SetRestartReason(reason).SetFailsTask()) + tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskNotRestarting).SetRestartReason(reason).SetFailsTask()) } return false, 0 case structs.TaskRestarting: tr.logger.Info("restarting task", "reason", reason, "delay", when) - tr.SetState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskRestarting).SetRestartDelay(when).SetRestartReason(reason)) + tr.UpdateState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskRestarting).SetRestartDelay(when).SetRestartReason(reason)) return true, 0 default: tr.logger.Error("restart tracker returned unknown state", "state", state) @@ -399,7 +399,7 @@ func (tr *TaskRunner) runDriver() error { tr.setDriverHandle(sresp.Handle) // Emit an event that we started - tr.SetState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) + tr.UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted)) return nil } @@ -483,9 +483,9 @@ func (tr *TaskRunner) Restore() error { return nil } -// SetState sets the task runners allocation state and triggers a server +// UpdateState sets the task runners allocation state and triggers a server // update. -func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) { +func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) { tr.logger.Debug("setting task state", "state", state, "event", event.Type) // Update the local state stateCopy := tr.setStateLocal(state, event) @@ -502,7 +502,7 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *str //XXX REMOVE ME AFTER TESTING if state == "" { - panic("SetState must not be called with an empty state") + panic("UpdateState must not be called with an empty state") } // Update the task state @@ -511,7 +511,7 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *str taskState.State = state // Append the event - tr.emitEventImpl(event) + tr.appendEvent(event) // Handle the state transition. switch state { @@ -561,16 +561,37 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *str } // EmitEvent appends a new TaskEvent to this task's TaskState. The actual -// TaskState.State (pending, running, dead) is not changed. Use SetState to +// TaskState.State (pending, running, dead) is not changed. Use UpdateState to // transition states. -// Events are persisted locally but errors are simply logged. +// Events are persisted locally and sent to the server, but errors are simply +// logged. Use AppendEvent to simply add a new event. func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) { tr.stateLock.Lock() defer tr.stateLock.Unlock() - tr.emitEventImpl(event) + tr.appendEvent(event) + + if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, tr.state); err != nil { + // Only a warning because the next event/state-transition will + // try to persist it again. + tr.logger.Warn("error persisting event", "error", err, "event", event) + } + + // Notify the alloc runner of the event + tr.stateUpdater.TaskStateUpdated(tr.taskName, tr.state.Copy()) +} + +// AppendEvent appends a new TaskEvent to this task's TaskState. The actual +// TaskState.State (pending, running, dead) is not changed. Use UpdateState to +// transition states. +// Events are persisted locally and errors are simply logged. Use EmitEvent +// also update AllocRunner. +func (tr *TaskRunner) AppendEvent(event *structs.TaskEvent) { + tr.stateLock.Lock() + defer tr.stateLock.Unlock() + + tr.appendEvent(event) - //XXX EmitEvents do not change TaskState.State so batch? if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, tr.state); err != nil { // Only a warning because the next event/state-transition will // try to persist it again. @@ -578,9 +599,8 @@ func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) { } } -// emitEventImpl is the implementation of EmitEvent without the locking so it -// can be used from SetState. -func (tr *TaskRunner) emitEventImpl(event *structs.TaskEvent) error { +// appendEvent to task's event slice. Caller must acquire stateLock. +func (tr *TaskRunner) appendEvent(event *structs.TaskEvent) error { // Ensure the event is populated with human readable strings event.PopulateEventDisplayMessage()