mirror of
https://github.com/kemko/nomad.git
synced 2026-01-06 02:15:43 +03:00
tr: refactor EmitEvents into Emit+Append
* UpdateState: set state, append event, persist, update servers * EmitEvent: append event, persist, update servers * AppendEvent: append event, persist AppendEvent may not even have to persist, but for the sake of correctness I'm going with that for now.
This commit is contained in:
@@ -16,8 +16,8 @@ func (tr *TaskRunner) Restart(ctx context.Context, event *structs.TaskEvent, fai
|
||||
return ErrTaskNotRunning
|
||||
}
|
||||
|
||||
// Emit the event
|
||||
tr.EmitEvent(event)
|
||||
// Append the event (it will get emitted by UpdateState)
|
||||
tr.AppendEvent(event)
|
||||
|
||||
// Tell the restart tracker that a restart triggered the exit
|
||||
tr.restartTracker.SetRestartTriggered(failure)
|
||||
@@ -66,8 +66,8 @@ func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error
|
||||
return ErrTaskNotRunning
|
||||
}
|
||||
|
||||
// Emit the event
|
||||
tr.EmitEvent(event)
|
||||
// Append the event (it will get emitted by UpdateState)
|
||||
tr.AppendEvent(event)
|
||||
|
||||
// Run the hooks prior to killing the task
|
||||
tr.kill()
|
||||
@@ -89,7 +89,7 @@ func (tr *TaskRunner) Kill(ctx context.Context, event *structs.TaskEvent) error
|
||||
}
|
||||
|
||||
// Store that the task has been destroyed and any associated error.
|
||||
tr.SetState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr))
|
||||
tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskKilled).SetKillError(destroyErr))
|
||||
|
||||
if destroyErr != nil {
|
||||
return destroyErr
|
||||
|
||||
@@ -362,12 +362,12 @@ func (tr *TaskRunner) shouldRestart() (bool, time.Duration) {
|
||||
case structs.TaskNotRestarting, structs.TaskTerminated:
|
||||
tr.logger.Info("not restarting task", "reason", reason)
|
||||
if state == structs.TaskNotRestarting {
|
||||
tr.SetState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskNotRestarting).SetRestartReason(reason).SetFailsTask())
|
||||
tr.UpdateState(structs.TaskStateDead, structs.NewTaskEvent(structs.TaskNotRestarting).SetRestartReason(reason).SetFailsTask())
|
||||
}
|
||||
return false, 0
|
||||
case structs.TaskRestarting:
|
||||
tr.logger.Info("restarting task", "reason", reason, "delay", when)
|
||||
tr.SetState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskRestarting).SetRestartDelay(when).SetRestartReason(reason))
|
||||
tr.UpdateState(structs.TaskStatePending, structs.NewTaskEvent(structs.TaskRestarting).SetRestartDelay(when).SetRestartReason(reason))
|
||||
return true, 0
|
||||
default:
|
||||
tr.logger.Error("restart tracker returned unknown state", "state", state)
|
||||
@@ -399,7 +399,7 @@ func (tr *TaskRunner) runDriver() error {
|
||||
tr.setDriverHandle(sresp.Handle)
|
||||
|
||||
// Emit an event that we started
|
||||
tr.SetState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
|
||||
tr.UpdateState(structs.TaskStateRunning, structs.NewTaskEvent(structs.TaskStarted))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -483,9 +483,9 @@ func (tr *TaskRunner) Restore() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetState sets the task runners allocation state and triggers a server
|
||||
// UpdateState sets the task runners allocation state and triggers a server
|
||||
// update.
|
||||
func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) {
|
||||
func (tr *TaskRunner) UpdateState(state string, event *structs.TaskEvent) {
|
||||
tr.logger.Debug("setting task state", "state", state, "event", event.Type)
|
||||
// Update the local state
|
||||
stateCopy := tr.setStateLocal(state, event)
|
||||
@@ -502,7 +502,7 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *str
|
||||
|
||||
//XXX REMOVE ME AFTER TESTING
|
||||
if state == "" {
|
||||
panic("SetState must not be called with an empty state")
|
||||
panic("UpdateState must not be called with an empty state")
|
||||
}
|
||||
|
||||
// Update the task state
|
||||
@@ -511,7 +511,7 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *str
|
||||
taskState.State = state
|
||||
|
||||
// Append the event
|
||||
tr.emitEventImpl(event)
|
||||
tr.appendEvent(event)
|
||||
|
||||
// Handle the state transition.
|
||||
switch state {
|
||||
@@ -561,16 +561,37 @@ func (tr *TaskRunner) setStateLocal(state string, event *structs.TaskEvent) *str
|
||||
}
|
||||
|
||||
// EmitEvent appends a new TaskEvent to this task's TaskState. The actual
|
||||
// TaskState.State (pending, running, dead) is not changed. Use SetState to
|
||||
// TaskState.State (pending, running, dead) is not changed. Use UpdateState to
|
||||
// transition states.
|
||||
// Events are persisted locally but errors are simply logged.
|
||||
// Events are persisted locally and sent to the server, but errors are simply
|
||||
// logged. Use AppendEvent to simply add a new event.
|
||||
func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) {
|
||||
tr.stateLock.Lock()
|
||||
defer tr.stateLock.Unlock()
|
||||
|
||||
tr.emitEventImpl(event)
|
||||
tr.appendEvent(event)
|
||||
|
||||
if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, tr.state); err != nil {
|
||||
// Only a warning because the next event/state-transition will
|
||||
// try to persist it again.
|
||||
tr.logger.Warn("error persisting event", "error", err, "event", event)
|
||||
}
|
||||
|
||||
// Notify the alloc runner of the event
|
||||
tr.stateUpdater.TaskStateUpdated(tr.taskName, tr.state.Copy())
|
||||
}
|
||||
|
||||
// AppendEvent appends a new TaskEvent to this task's TaskState. The actual
|
||||
// TaskState.State (pending, running, dead) is not changed. Use UpdateState to
|
||||
// transition states.
|
||||
// Events are persisted locally and errors are simply logged. Use EmitEvent
|
||||
// also update AllocRunner.
|
||||
func (tr *TaskRunner) AppendEvent(event *structs.TaskEvent) {
|
||||
tr.stateLock.Lock()
|
||||
defer tr.stateLock.Unlock()
|
||||
|
||||
tr.appendEvent(event)
|
||||
|
||||
//XXX EmitEvents do not change TaskState.State so batch?
|
||||
if err := tr.stateDB.PutTaskState(tr.allocID, tr.taskName, tr.state); err != nil {
|
||||
// Only a warning because the next event/state-transition will
|
||||
// try to persist it again.
|
||||
@@ -578,9 +599,8 @@ func (tr *TaskRunner) EmitEvent(event *structs.TaskEvent) {
|
||||
}
|
||||
}
|
||||
|
||||
// emitEventImpl is the implementation of EmitEvent without the locking so it
|
||||
// can be used from SetState.
|
||||
func (tr *TaskRunner) emitEventImpl(event *structs.TaskEvent) error {
|
||||
// appendEvent to task's event slice. Caller must acquire stateLock.
|
||||
func (tr *TaskRunner) appendEvent(event *structs.TaskEvent) error {
|
||||
// Ensure the event is populated with human readable strings
|
||||
event.PopulateEventDisplayMessage()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user