From 17cab6ebd7269a53885116d557b940a545f94a89 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 9 Nov 2015 12:36:07 -0800 Subject: [PATCH] Updating snapshots of a TaskRunner when status of Task changes --- client/alloc_runner.go | 25 +++++++++++++++++++------ client/client.go | 2 +- client/task_runner.go | 4 ++++ 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 1504900c1..eec492f5a 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -119,7 +119,7 @@ func (r *AllocRunner) RestoreState() error { } // SaveState is used to snapshot our state -func (r *AllocRunner) SaveState() error { +func (r *AllocRunner) SaveState(taskName string) error { r.taskStatusLock.RLock() snap := allocRunnerState{ Alloc: r.alloc, @@ -137,16 +137,28 @@ func (r *AllocRunner) SaveState() error { r.taskLock.RLock() defer r.taskLock.RUnlock() var mErr multierror.Error - for name, tr := range r.tasks { - if err := tr.SaveState(); err != nil { - r.logger.Printf("[ERR] client: failed to save state for alloc %s task '%s': %v", - r.alloc.ID, name, err) - mErr.Errors = append(mErr.Errors, err) + if taskName != "" { + tr, ok := r.tasks[taskName] + if !ok { + mErr.Errors = append(mErr.Errors, fmt.Errorf("[ERR] client: Task with name %v not found in alloc runner %v", taskName, r.alloc.Name)) } + r.saveTaskRunnerState(tr, &mErr) + return mErr.ErrorOrNil() + } + for _, tr := range r.tasks { + r.saveTaskRunnerState(tr, &mErr) } return mErr.ErrorOrNil() } +func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner, mErr *multierror.Error) { + if err := tr.SaveState(); err != nil { + r.logger.Printf("[ERR] client: failed to save state for alloc %s task '%s': %v", + r.alloc.ID, tr.task.Name, err) + mErr.Errors = append(mErr.Errors, err) + } +} + // DestroyState is used to cleanup after ourselves func (r *AllocRunner) DestroyState() error { return os.RemoveAll(filepath.Dir(r.stateFilePath())) @@ -257,6 +269,7 @@ func (r *AllocRunner) setTaskStatus(taskName, status, desc string) { Description: desc, } r.taskStatusLock.Unlock() + r.SaveState(taskName) select { case r.dirtyCh <- struct{}{}: default: diff --git a/client/client.go b/client/client.go index 029ac3954..714473d67 100644 --- a/client/client.go +++ b/client/client.go @@ -351,7 +351,7 @@ func (c *Client) saveState() error { c.allocLock.RLock() defer c.allocLock.RUnlock() for id, ar := range c.allocs { - if err := ar.SaveState(); err != nil { + if err := ar.SaveState(""); err != nil { c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", id, err) mErr.Errors = append(mErr.Errors, err) diff --git a/client/task_runner.go b/client/task_runner.go index b54b7604b..88e746b1f 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -32,6 +32,8 @@ type TaskRunner struct { destroyCh chan struct{} destroyLock sync.Mutex waitCh chan struct{} + + snapshotLock sync.Mutex } // taskRunnerState is used to snapshot the state of the task runner @@ -112,6 +114,8 @@ func (r *TaskRunner) RestoreState() error { // SaveState is used to snapshot our state func (r *TaskRunner) SaveState() error { + r.snapshotLock.Lock() + defer r.snapshotLock.Unlock() snap := taskRunnerState{ Task: r.task, }