From 0892aca3fcb276d21735ae72a747aae4dfe15faa Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 27 Jun 2018 17:27:03 -0700 Subject: [PATCH] begin adding AllocRunner.Update --- client/allocrunnerv2/alloc_runner.go | 85 +++++++++++-------- client/allocrunnerv2/alloc_runner_hooks.go | 2 +- .../allocrunnerv2/taskrunner/task_runner.go | 7 +- .../taskrunner/task_runner_hooks.go | 2 - 4 files changed, 55 insertions(+), 41 deletions(-) diff --git a/client/allocrunnerv2/alloc_runner.go b/client/allocrunnerv2/alloc_runner.go index 8eaa4112e..0da2a5faa 100644 --- a/client/allocrunnerv2/alloc_runner.go +++ b/client/allocrunnerv2/alloc_runner.go @@ -1,7 +1,6 @@ package allocrunnerv2 import ( - "context" "fmt" "sync" @@ -16,13 +15,6 @@ import ( // allocRunner is used to run all the tasks in a given allocation type allocRunner struct { - // ctx is the alloc runners context. It is cancelled when all related - // activity for this allocation should be terminated. - ctx context.Context - - // ctxCancel is used to cancel the alloc runners cancel - ctxCancel context.CancelFunc - // Logger is the logger for the alloc runner. logger log.Logger @@ -48,21 +40,19 @@ type allocRunner struct { runnerHooks []interfaces.RunnerHook // tasks are the set of task runners - tasks map[string]*taskrunner.TaskRunner - taskLock sync.Mutex + tasks map[string]*taskrunner.TaskRunner + + // updateCh receives allocation updates + updateCh chan *structs.Allocation } // NewAllocRunner returns a new allocation runner. -func NewAllocRunner(ctx context.Context, config *config.Config) (*allocRunner, error) { - // Create a context for the runner - arCtx, arCancel := context.WithCancel(ctx) - +func NewAllocRunner(config *config.Config) (*allocRunner, error) { ar := &allocRunner{ - ctx: arCtx, - ctxCancel: arCancel, - config: config, - alloc: config.Allocation, - waitCh: make(chan struct{}), + config: config, + alloc: config.Allocation, + waitCh: make(chan struct{}), + updateCh: make(chan *structs.Allocation), } // Create the logger based on the allocation ID @@ -84,6 +74,9 @@ func (ar *allocRunner) Run() { // Close the wait channel defer close(ar.waitCh) + var err error + var taskWaitCh <-chan struct{} + // Run the prerun hooks // XXX Equivalent to TR.Prerun hook if err := ar.prerun(); err != nil { @@ -92,10 +85,25 @@ func (ar *allocRunner) Run() { } // Run the runners - if err := ar.runImpl(); err != nil { + taskWaitCh, err = ar.runImpl() + if err != nil { ar.logger.Error("starting tasks failed", "error", err) } + for { + select { + case <-taskWaitCh: + // TaskRunners have all exited + case updated := <-ar.updateCh: + // Updated alloc received + //XXX Update hooks + //XXX Update ar.alloc + for _, tr := range ar.tasks { + tr.Update(updated) + } + } + } + POST: // Run the postrun hooks // XXX Equivalent to TR.Poststop hook @@ -105,35 +113,32 @@ POST: } // runImpl is used to run the runners. -func (ar *allocRunner) runImpl() error { +func (ar *allocRunner) runImpl() (<-chan struct{}, error) { // Grab the task group tg := ar.alloc.Job.LookupTaskGroup(ar.alloc.TaskGroup) if tg == nil { // XXX Fail and exit ar.logger.Error("failed to lookup task group", "task_group", ar.alloc.TaskGroup) - return fmt.Errorf("failed to lookup task group %q", ar.alloc.TaskGroup) + return nil, fmt.Errorf("failed to lookup task group %q", ar.alloc.TaskGroup) } for _, task := range tg.Tasks { if err := ar.runTask(task); err != nil { - return err + return nil, err } } - // Block until all tasks are done. - var wg sync.WaitGroup - ar.taskLock.Lock() - for _, task := range ar.tasks { - wg.Add(1) - go func() { + // Return a combined WaitCh that is closed when all task runners have + // exited. + waitCh := make(chan struct{}) + go func() { + defer close(waitCh) + for _, task := range ar.tasks { <-task.WaitCh() - wg.Done() - }() - } - ar.taskLock.Unlock() + } + }() - wg.Wait() - return nil + return waitCh, nil } // runTask is used to run a task. @@ -153,8 +158,14 @@ func (ar *allocRunner) runTask(task *structs.Task) error { go tr.Run() // Store the runner - ar.taskLock.Lock() ar.tasks[task.Name] = tr - ar.taskLock.Unlock() return nil } + +// Update the running allocation with a new version received from the server. +// +// This method is safe for calling concurrently with Run() and does not modify +// the passed in allocation. +func (ar *allocRunner) Update(update *structs.Allocation) { + ar.updateCh <- update +} diff --git a/client/allocrunnerv2/alloc_runner_hooks.go b/client/allocrunnerv2/alloc_runner_hooks.go index 085020813..e39d3fbdc 100644 --- a/client/allocrunnerv2/alloc_runner_hooks.go +++ b/client/allocrunnerv2/alloc_runner_hooks.go @@ -143,7 +143,7 @@ type allocHealthWatcherHook struct { } func newAllocHealthWatcherHook(runner *allocRunner, logger log.Logger) *allocHealthWatcherHook { - ctx, cancelFn := context.WithCancel(runner.ctx) + ctx, cancelFn := context.WithCancel(context.Background()) ad := &allocHealthWatcherHook{ runner: runner, ctx: ctx, diff --git a/client/allocrunnerv2/taskrunner/task_runner.go b/client/allocrunnerv2/taskrunner/task_runner.go index aae5ebc86..28b47a58c 100644 --- a/client/allocrunnerv2/taskrunner/task_runner.go +++ b/client/allocrunnerv2/taskrunner/task_runner.go @@ -177,11 +177,16 @@ func (tr *TaskRunner) initLabels() { } } +// WaitCh is closed when TaskRunner.Run exits. func (tr *TaskRunner) WaitCh() <-chan struct{} { return tr.waitCh } -func (tr *TaskRunner) Update() { +// Update the running allocation with a new version received from the server. +// +// This method is safe for calling concurrently with Run() and does not modify +// the passed in allocation. +func (tr *TaskRunner) Update(update *structs.Allocation) { // XXX } diff --git a/client/allocrunnerv2/taskrunner/task_runner_hooks.go b/client/allocrunnerv2/taskrunner/task_runner_hooks.go index 169403a19..4f050a23a 100644 --- a/client/allocrunnerv2/taskrunner/task_runner_hooks.go +++ b/client/allocrunnerv2/taskrunner/task_runner_hooks.go @@ -17,12 +17,10 @@ import ( // initHooks intializes the tasks hooks. func (tr *TaskRunner) initHooks() { hookLogger := tr.logger.Named("task_hook") - tr.runnerHooks = make([]interfaces.TaskHook, 0, 3) // Create the task directory hook. This is run first to ensure the // directoy path exists for other hooks. tr.runnerHooks = []interfaces.TaskHook{ - tr.runnerHooks, newTaskDirHook(tr, hookLogger), newArtifactHook(tr, hookLogger), }