From ebf1aa0952682233acb149aaa8fa268bb96ba157 Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Wed, 18 Jul 2018 13:45:55 -0700 Subject: [PATCH] Define and thread through state updating interface --- client/allocrunnerv2/alloc_runner.go | 8 +++++ client/allocrunnerv2/interfaces/runner.go | 13 +++++++-- .../allocrunnerv2/taskrunner/task_runner.go | 29 ++++++++++++------- 3 files changed, 36 insertions(+), 14 deletions(-) diff --git a/client/allocrunnerv2/alloc_runner.go b/client/allocrunnerv2/alloc_runner.go index 04c091267..73cb82b60 100644 --- a/client/allocrunnerv2/alloc_runner.go +++ b/client/allocrunnerv2/alloc_runner.go @@ -104,6 +104,7 @@ func (ar *allocRunner) initTaskRunners(tasks []*structs.Task) error { TaskDir: ar.allocDir.NewTaskDir(task.Name), Logger: ar.logger, StateDB: ar.stateDB, + StateUpdater: ar, VaultClient: ar.vaultClient, } @@ -218,6 +219,13 @@ func (ar *allocRunner) Restore() error { }) } +// TaskStateUpdated is called when a task's state has been updated. This hook is +// used to compute changes to the alloc's ClientStatus and to update the server +// with the new state. +func (ar *allocRunner) TaskStateUpdated(task string, state *structs.TaskState) error { + return nil +} + // Update the running allocation with a new version received from the server. // // This method is safe for calling concurrently with Run() and does not modify diff --git a/client/allocrunnerv2/interfaces/runner.go b/client/allocrunnerv2/interfaces/runner.go index a888484f7..73f6e9f1f 100644 --- a/client/allocrunnerv2/interfaces/runner.go +++ b/client/allocrunnerv2/interfaces/runner.go @@ -1,6 +1,9 @@ package interfaces -import "github.com/hashicorp/nomad/client/allocrunnerv2/state" +import ( + "github.com/hashicorp/nomad/client/allocrunnerv2/state" + "github.com/hashicorp/nomad/nomad/structs" +) // AllocRunner is the interface for an allocation runner. type AllocRunner interface { @@ -13,8 +16,12 @@ type AllocRunner interface { // State returns a copy of the runners state object State() *state.State + + TaskStateHandler } -// TaskRunner is the interface for a task runner. -type TaskRunner interface { +// TaskStateHandler exposes a handler to be called when a task's state changes +type TaskStateHandler interface { + // TaskStateUpdated is used to emit updated task state + TaskStateUpdated(task string, state *structs.TaskState) error } diff --git a/client/allocrunnerv2/taskrunner/task_runner.go b/client/allocrunnerv2/taskrunner/task_runner.go index e69bb31dd..a419fb0f8 100644 --- a/client/allocrunnerv2/taskrunner/task_runner.go +++ b/client/allocrunnerv2/taskrunner/task_runner.go @@ -60,6 +60,9 @@ type TaskRunner struct { clientConfig *config.Config + // stateUpdater is used to emit updated task state + stateUpdater interfaces.TaskStateHandler + // state captures the state of the task for updating the allocation state *structs.TaskState stateLock sync.Mutex @@ -146,6 +149,9 @@ type Config struct { // StateDB is used to store and restore state. StateDB *bolt.DB + + // StateUpdater is used to emit updated task state + StateUpdater interfaces.TaskStateHandler } func NewTaskRunner(config *Config) (*TaskRunner, error) { @@ -170,13 +176,14 @@ func NewTaskRunner(config *Config) (*TaskRunner, error) { envBuilder: envBuilder, vaultClient: config.VaultClient, //XXX Make a Copy to avoid races? - state: config.Alloc.TaskStates[config.Task.Name], - localState: config.LocalState, - stateDB: config.StateDB, - ctx: trCtx, - ctxCancel: trCancel, - updateCh: make(chan *structs.Allocation), - waitCh: make(chan struct{}), + state: config.Alloc.TaskStates[config.Task.Name], + localState: config.LocalState, + stateDB: config.StateDB, + stateUpdater: config.StateUpdater, + ctx: trCtx, + ctxCancel: trCancel, + updateCh: make(chan *structs.Allocation), + waitCh: make(chan struct{}), } // Create the logger based on the allocation ID @@ -575,10 +582,9 @@ func (tr *TaskRunner) SetState(state string, event *structs.TaskEvent) { } // Create a copy and notify the alloc runner of the transition - //FIXME <-------START HERE - //if err := tr.allocRunner.StateUpdated(tr.state.Copy()); err != nil { - //tr.logger.Error("failed to save state", "error", err) - //} + if err := tr.stateUpdater.TaskStateUpdated(tr.taskName, tr.state.Copy()); err != nil { + tr.logger.Error("failed to save state", "error", err) + } } // EmitEvent appends a new TaskEvent to this task's TaskState. The actual @@ -620,6 +626,7 @@ func (tr *TaskRunner) emitEventImpl(event *structs.TaskEvent) error { tr.state.Failed = true } + // XXX This seems like a super awkward spot for this? Why not shouldRestart? // Update restart metrics if event.Type == structs.TaskRestarting { if !tr.clientConfig.DisableTaggedMetrics {