begin adding AllocRunner.Update

This commit is contained in:
Michael Schurter
2018-06-27 17:27:03 -07:00
parent 15c0731096
commit 0892aca3fc
4 changed files with 55 additions and 41 deletions

View File

@@ -1,7 +1,6 @@
package allocrunnerv2
import (
"context"
"fmt"
"sync"
@@ -16,13 +15,6 @@ import (
// allocRunner is used to run all the tasks in a given allocation
type allocRunner struct {
// ctx is the alloc runners context. It is cancelled when all related
// activity for this allocation should be terminated.
ctx context.Context
// ctxCancel is used to cancel the alloc runners cancel
ctxCancel context.CancelFunc
// Logger is the logger for the alloc runner.
logger log.Logger
@@ -48,21 +40,19 @@ type allocRunner struct {
runnerHooks []interfaces.RunnerHook
// tasks are the set of task runners
tasks map[string]*taskrunner.TaskRunner
taskLock sync.Mutex
tasks map[string]*taskrunner.TaskRunner
// updateCh receives allocation updates
updateCh chan *structs.Allocation
}
// NewAllocRunner returns a new allocation runner.
func NewAllocRunner(ctx context.Context, config *config.Config) (*allocRunner, error) {
// Create a context for the runner
arCtx, arCancel := context.WithCancel(ctx)
func NewAllocRunner(config *config.Config) (*allocRunner, error) {
ar := &allocRunner{
ctx: arCtx,
ctxCancel: arCancel,
config: config,
alloc: config.Allocation,
waitCh: make(chan struct{}),
config: config,
alloc: config.Allocation,
waitCh: make(chan struct{}),
updateCh: make(chan *structs.Allocation),
}
// Create the logger based on the allocation ID
@@ -84,6 +74,9 @@ func (ar *allocRunner) Run() {
// Close the wait channel
defer close(ar.waitCh)
var err error
var taskWaitCh <-chan struct{}
// Run the prerun hooks
// XXX Equivalent to TR.Prerun hook
if err := ar.prerun(); err != nil {
@@ -92,10 +85,25 @@ func (ar *allocRunner) Run() {
}
// Run the runners
if err := ar.runImpl(); err != nil {
taskWaitCh, err = ar.runImpl()
if err != nil {
ar.logger.Error("starting tasks failed", "error", err)
}
for {
select {
case <-taskWaitCh:
// TaskRunners have all exited
case updated := <-ar.updateCh:
// Updated alloc received
//XXX Update hooks
//XXX Update ar.alloc
for _, tr := range ar.tasks {
tr.Update(updated)
}
}
}
POST:
// Run the postrun hooks
// XXX Equivalent to TR.Poststop hook
@@ -105,35 +113,32 @@ POST:
}
// runImpl is used to run the runners.
func (ar *allocRunner) runImpl() error {
func (ar *allocRunner) runImpl() (<-chan struct{}, error) {
// Grab the task group
tg := ar.alloc.Job.LookupTaskGroup(ar.alloc.TaskGroup)
if tg == nil {
// XXX Fail and exit
ar.logger.Error("failed to lookup task group", "task_group", ar.alloc.TaskGroup)
return fmt.Errorf("failed to lookup task group %q", ar.alloc.TaskGroup)
return nil, fmt.Errorf("failed to lookup task group %q", ar.alloc.TaskGroup)
}
for _, task := range tg.Tasks {
if err := ar.runTask(task); err != nil {
return err
return nil, err
}
}
// Block until all tasks are done.
var wg sync.WaitGroup
ar.taskLock.Lock()
for _, task := range ar.tasks {
wg.Add(1)
go func() {
// Return a combined WaitCh that is closed when all task runners have
// exited.
waitCh := make(chan struct{})
go func() {
defer close(waitCh)
for _, task := range ar.tasks {
<-task.WaitCh()
wg.Done()
}()
}
ar.taskLock.Unlock()
}
}()
wg.Wait()
return nil
return waitCh, nil
}
// runTask is used to run a task.
@@ -153,8 +158,14 @@ func (ar *allocRunner) runTask(task *structs.Task) error {
go tr.Run()
// Store the runner
ar.taskLock.Lock()
ar.tasks[task.Name] = tr
ar.taskLock.Unlock()
return nil
}
// Update the running allocation with a new version received from the server.
//
// This method is safe for calling concurrently with Run() and does not modify
// the passed in allocation.
func (ar *allocRunner) Update(update *structs.Allocation) {
ar.updateCh <- update
}

View File

@@ -143,7 +143,7 @@ type allocHealthWatcherHook struct {
}
func newAllocHealthWatcherHook(runner *allocRunner, logger log.Logger) *allocHealthWatcherHook {
ctx, cancelFn := context.WithCancel(runner.ctx)
ctx, cancelFn := context.WithCancel(context.Background())
ad := &allocHealthWatcherHook{
runner: runner,
ctx: ctx,

View File

@@ -177,11 +177,16 @@ func (tr *TaskRunner) initLabels() {
}
}
// WaitCh is closed when TaskRunner.Run exits.
func (tr *TaskRunner) WaitCh() <-chan struct{} {
return tr.waitCh
}
func (tr *TaskRunner) Update() {
// Update the running allocation with a new version received from the server.
//
// This method is safe for calling concurrently with Run() and does not modify
// the passed in allocation.
func (tr *TaskRunner) Update(update *structs.Allocation) {
// XXX
}

View File

@@ -17,12 +17,10 @@ import (
// initHooks intializes the tasks hooks.
func (tr *TaskRunner) initHooks() {
hookLogger := tr.logger.Named("task_hook")
tr.runnerHooks = make([]interfaces.TaskHook, 0, 3)
// Create the task directory hook. This is run first to ensure the
// directoy path exists for other hooks.
tr.runnerHooks = []interfaces.TaskHook{
tr.runnerHooks,
newTaskDirHook(tr, hookLogger),
newArtifactHook(tr, hookLogger),
}