From 17cab6ebd7269a53885116d557b940a545f94a89 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 9 Nov 2015 12:36:07 -0800 Subject: [PATCH 1/6] Updating snapshots of a TaskRunner when status of Task changes --- client/alloc_runner.go | 25 +++++++++++++++++++------ client/client.go | 2 +- client/task_runner.go | 4 ++++ 3 files changed, 24 insertions(+), 7 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 1504900c1..eec492f5a 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -119,7 +119,7 @@ func (r *AllocRunner) RestoreState() error { } // SaveState is used to snapshot our state -func (r *AllocRunner) SaveState() error { +func (r *AllocRunner) SaveState(taskName string) error { r.taskStatusLock.RLock() snap := allocRunnerState{ Alloc: r.alloc, @@ -137,16 +137,28 @@ func (r *AllocRunner) SaveState() error { r.taskLock.RLock() defer r.taskLock.RUnlock() var mErr multierror.Error - for name, tr := range r.tasks { - if err := tr.SaveState(); err != nil { - r.logger.Printf("[ERR] client: failed to save state for alloc %s task '%s': %v", - r.alloc.ID, name, err) - mErr.Errors = append(mErr.Errors, err) + if taskName != "" { + tr, ok := r.tasks[taskName] + if !ok { + mErr.Errors = append(mErr.Errors, fmt.Errorf("[ERR] client: Task with name %v not found in alloc runner %v", taskName, r.alloc.Name)) } + r.saveTaskRunnerState(tr, &mErr) + return mErr.ErrorOrNil() + } + for _, tr := range r.tasks { + r.saveTaskRunnerState(tr, &mErr) } return mErr.ErrorOrNil() } +func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner, mErr *multierror.Error) { + if err := tr.SaveState(); err != nil { + r.logger.Printf("[ERR] client: failed to save state for alloc %s task '%s': %v", + r.alloc.ID, tr.task.Name, err) + mErr.Errors = append(mErr.Errors, err) + } +} + // DestroyState is used to cleanup after ourselves func (r *AllocRunner) DestroyState() error { return os.RemoveAll(filepath.Dir(r.stateFilePath())) @@ -257,6 +269,7 @@ func (r *AllocRunner) setTaskStatus(taskName, status, desc string) { Description: desc, } r.taskStatusLock.Unlock() + r.SaveState(taskName) select { case r.dirtyCh <- struct{}{}: default: diff --git a/client/client.go b/client/client.go index 029ac3954..714473d67 100644 --- a/client/client.go +++ b/client/client.go @@ -351,7 +351,7 @@ func (c *Client) saveState() error { c.allocLock.RLock() defer c.allocLock.RUnlock() for id, ar := range c.allocs { - if err := ar.SaveState(); err != nil { + if err := ar.SaveState(""); err != nil { c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", id, err) mErr.Errors = append(mErr.Errors, err) diff --git a/client/task_runner.go b/client/task_runner.go index b54b7604b..88e746b1f 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -32,6 +32,8 @@ type TaskRunner struct { destroyCh chan struct{} destroyLock sync.Mutex waitCh chan struct{} + + snapshotLock sync.Mutex } // taskRunnerState is used to snapshot the state of the task runner @@ -112,6 +114,8 @@ func (r *TaskRunner) RestoreState() error { // SaveState is used to snapshot our state func (r *TaskRunner) SaveState() error { + r.snapshotLock.Lock() + defer r.snapshotLock.Unlock() snap := taskRunnerState{ Task: r.task, } From b350fc8f7f720795c3e3a5af34ea02540bb64fd9 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 9 Nov 2015 12:44:14 -0800 Subject: [PATCH 2/6] Returning once we have an error while saving a task that doesn't exist --- client/alloc_runner.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index eec492f5a..f74d176ad 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -138,11 +138,11 @@ func (r *AllocRunner) SaveState(taskName string) error { defer r.taskLock.RUnlock() var mErr multierror.Error if taskName != "" { - tr, ok := r.tasks[taskName] - if !ok { + if tr, ok := r.tasks[taskName]; ok { + r.saveTaskRunnerState(tr, &mErr) + } else { mErr.Errors = append(mErr.Errors, fmt.Errorf("[ERR] client: Task with name %v not found in alloc runner %v", taskName, r.alloc.Name)) } - r.saveTaskRunnerState(tr, &mErr) return mErr.ErrorOrNil() } for _, tr := range r.tasks { From a797f11f74cddfcf999c57f6f3582017ae5fe571 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 9 Nov 2015 16:15:11 -0800 Subject: [PATCH 3/6] Refactored Save State of Alloc runner --- client/alloc_runner.go | 37 ++++++++++++++++++++++--------------- client/client.go | 2 +- 2 files changed, 23 insertions(+), 16 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index f74d176ad..1533aea92 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -118,8 +118,11 @@ func (r *AllocRunner) RestoreState() error { return mErr.ErrorOrNil() } -// SaveState is used to snapshot our state -func (r *AllocRunner) SaveState(taskName string) error { +// SaveState is used to snapshot the state of the alloc runner +// if the fullSync is marked as false only the state of the Alloc Runner +// is snapshotted. If fullSync is marked as true, we snapshot +// all the Task Runners associated with the Alloc +func (r *AllocRunner) SaveState(fullSync bool) error { r.taskStatusLock.RLock() snap := allocRunnerState{ Alloc: r.alloc, @@ -133,30 +136,29 @@ func (r *AllocRunner) SaveState(taskName string) error { return err } + if !fullSync { + return nil + } + // Save state for each task r.taskLock.RLock() defer r.taskLock.RUnlock() var mErr multierror.Error - if taskName != "" { - if tr, ok := r.tasks[taskName]; ok { - r.saveTaskRunnerState(tr, &mErr) - } else { - mErr.Errors = append(mErr.Errors, fmt.Errorf("[ERR] client: Task with name %v not found in alloc runner %v", taskName, r.alloc.Name)) - } - return mErr.ErrorOrNil() - } for _, tr := range r.tasks { - r.saveTaskRunnerState(tr, &mErr) + if err := r.saveTaskRunnerState(tr); err != nil { + mErr.Errors = append(mErr.Errors, err) + } } return mErr.ErrorOrNil() } -func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner, mErr *multierror.Error) { - if err := tr.SaveState(); err != nil { +func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error { + var err error + if err = tr.SaveState(); err != nil { r.logger.Printf("[ERR] client: failed to save state for alloc %s task '%s': %v", r.alloc.ID, tr.task.Name, err) - mErr.Errors = append(mErr.Errors, err) } + return err } // DestroyState is used to cleanup after ourselves @@ -201,6 +203,9 @@ func (r *AllocRunner) retrySyncState(stopCh chan struct{}) { for { err := r.syncStatus() if err == nil { + // The Alloc State might have been re-computed so we are + // snapshoting only the alloc runner + r.SaveState(false) return } select { @@ -269,7 +274,9 @@ func (r *AllocRunner) setTaskStatus(taskName, status, desc string) { Description: desc, } r.taskStatusLock.Unlock() - r.SaveState(taskName) + if tr, ok := r.tasks[taskName]; ok { + r.saveTaskRunnerState(tr) + } select { case r.dirtyCh <- struct{}{}: default: diff --git a/client/client.go b/client/client.go index 714473d67..16a5b29e9 100644 --- a/client/client.go +++ b/client/client.go @@ -351,7 +351,7 @@ func (c *Client) saveState() error { c.allocLock.RLock() defer c.allocLock.RUnlock() for id, ar := range c.allocs { - if err := ar.SaveState(""); err != nil { + if err := ar.SaveState(true); err != nil { c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", id, err) mErr.Errors = append(mErr.Errors, err) From d6de54836ef9f117d639270c3a0ad86858dfe2a0 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 9 Nov 2015 16:45:42 -0800 Subject: [PATCH 4/6] Inlining error check --- client/alloc_runner.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 1533aea92..307256585 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -201,8 +201,7 @@ func (r *AllocRunner) dirtySyncState() { // retrySyncState is used to retry the state sync until success func (r *AllocRunner) retrySyncState(stopCh chan struct{}) { for { - err := r.syncStatus() - if err == nil { + if err := r.syncStatus(); err == nil { // The Alloc State might have been re-computed so we are // snapshoting only the alloc runner r.SaveState(false) From 056d113ded962c5e77942f30c2fd5332cb6697f4 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 9 Nov 2015 16:59:02 -0800 Subject: [PATCH 5/6] Exctracted a method to save the alloc runner state --- client/alloc_runner.go | 37 +++++++++++++++++++------------------ client/client.go | 2 +- 2 files changed, 20 insertions(+), 19 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 307256585..d6f7f3c84 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -122,23 +122,8 @@ func (r *AllocRunner) RestoreState() error { // if the fullSync is marked as false only the state of the Alloc Runner // is snapshotted. If fullSync is marked as true, we snapshot // all the Task Runners associated with the Alloc -func (r *AllocRunner) SaveState(fullSync bool) error { - r.taskStatusLock.RLock() - snap := allocRunnerState{ - Alloc: r.alloc, - RestartPolicy: r.RestartPolicy, - TaskStatus: r.taskStatus, - Context: r.ctx, - } - err := persistState(r.stateFilePath(), &snap) - r.taskStatusLock.RUnlock() - if err != nil { - return err - } - - if !fullSync { - return nil - } +func (r *AllocRunner) SaveState() error { + r.saveAllocRunnerState() // Save state for each task r.taskLock.RLock() @@ -152,6 +137,22 @@ func (r *AllocRunner) SaveState(fullSync bool) error { return mErr.ErrorOrNil() } +func (r *AllocRunner) saveAllocRunnerState() error { + r.taskStatusLock.RLock() + defer r.taskStatusLock.RUnlock() + snap := allocRunnerState{ + Alloc: r.alloc, + RestartPolicy: r.RestartPolicy, + TaskStatus: r.taskStatus, + Context: r.ctx, + } + err := persistState(r.stateFilePath(), &snap) + if err != nil { + return err + } + return nil +} + func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error { var err error if err = tr.SaveState(); err != nil { @@ -204,7 +205,7 @@ func (r *AllocRunner) retrySyncState(stopCh chan struct{}) { if err := r.syncStatus(); err == nil { // The Alloc State might have been re-computed so we are // snapshoting only the alloc runner - r.SaveState(false) + r.saveAllocRunnerState() return } select { diff --git a/client/client.go b/client/client.go index 16a5b29e9..029ac3954 100644 --- a/client/client.go +++ b/client/client.go @@ -351,7 +351,7 @@ func (c *Client) saveState() error { c.allocLock.RLock() defer c.allocLock.RUnlock() for id, ar := range c.allocs { - if err := ar.SaveState(true); err != nil { + if err := ar.SaveState(); err != nil { c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v", id, err) mErr.Errors = append(mErr.Errors, err) From bd82b0f1ce5eb78a97c35080bc802aceee639ca6 Mon Sep 17 00:00:00 2001 From: Diptanu Choudhury Date: Mon, 9 Nov 2015 17:04:53 -0800 Subject: [PATCH 6/6] Returning back if alloc runner isn't saved --- client/alloc_runner.go | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index d6f7f3c84..b0dbde878 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -123,7 +123,9 @@ func (r *AllocRunner) RestoreState() error { // is snapshotted. If fullSync is marked as true, we snapshot // all the Task Runners associated with the Alloc func (r *AllocRunner) SaveState() error { - r.saveAllocRunnerState() + if err := r.saveAllocRunnerState(); err != nil { + return err + } // Save state for each task r.taskLock.RLock() @@ -146,11 +148,7 @@ func (r *AllocRunner) saveAllocRunnerState() error { TaskStatus: r.taskStatus, Context: r.ctx, } - err := persistState(r.stateFilePath(), &snap) - if err != nil { - return err - } - return nil + return persistState(r.stateFilePath(), &snap) } func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner) error {