mirror of
https://github.com/kemko/nomad.git
synced 2026-01-08 19:35:41 +03:00
Updating snapshots of a TaskRunner when status of Task changes
This commit is contained in:
@@ -119,7 +119,7 @@ func (r *AllocRunner) RestoreState() error {
|
||||
}
|
||||
|
||||
// SaveState is used to snapshot our state
|
||||
func (r *AllocRunner) SaveState() error {
|
||||
func (r *AllocRunner) SaveState(taskName string) error {
|
||||
r.taskStatusLock.RLock()
|
||||
snap := allocRunnerState{
|
||||
Alloc: r.alloc,
|
||||
@@ -137,16 +137,28 @@ func (r *AllocRunner) SaveState() error {
|
||||
r.taskLock.RLock()
|
||||
defer r.taskLock.RUnlock()
|
||||
var mErr multierror.Error
|
||||
for name, tr := range r.tasks {
|
||||
if err := tr.SaveState(); err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to save state for alloc %s task '%s': %v",
|
||||
r.alloc.ID, name, err)
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
if taskName != "" {
|
||||
tr, ok := r.tasks[taskName]
|
||||
if !ok {
|
||||
mErr.Errors = append(mErr.Errors, fmt.Errorf("[ERR] client: Task with name %v not found in alloc runner %v", taskName, r.alloc.Name))
|
||||
}
|
||||
r.saveTaskRunnerState(tr, &mErr)
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
for _, tr := range r.tasks {
|
||||
r.saveTaskRunnerState(tr, &mErr)
|
||||
}
|
||||
return mErr.ErrorOrNil()
|
||||
}
|
||||
|
||||
func (r *AllocRunner) saveTaskRunnerState(tr *TaskRunner, mErr *multierror.Error) {
|
||||
if err := tr.SaveState(); err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to save state for alloc %s task '%s': %v",
|
||||
r.alloc.ID, tr.task.Name, err)
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
}
|
||||
}
|
||||
|
||||
// DestroyState is used to cleanup after ourselves
|
||||
func (r *AllocRunner) DestroyState() error {
|
||||
return os.RemoveAll(filepath.Dir(r.stateFilePath()))
|
||||
@@ -257,6 +269,7 @@ func (r *AllocRunner) setTaskStatus(taskName, status, desc string) {
|
||||
Description: desc,
|
||||
}
|
||||
r.taskStatusLock.Unlock()
|
||||
r.SaveState(taskName)
|
||||
select {
|
||||
case r.dirtyCh <- struct{}{}:
|
||||
default:
|
||||
|
||||
@@ -351,7 +351,7 @@ func (c *Client) saveState() error {
|
||||
c.allocLock.RLock()
|
||||
defer c.allocLock.RUnlock()
|
||||
for id, ar := range c.allocs {
|
||||
if err := ar.SaveState(); err != nil {
|
||||
if err := ar.SaveState(""); err != nil {
|
||||
c.logger.Printf("[ERR] client: failed to save state for alloc %s: %v",
|
||||
id, err)
|
||||
mErr.Errors = append(mErr.Errors, err)
|
||||
|
||||
@@ -32,6 +32,8 @@ type TaskRunner struct {
|
||||
destroyCh chan struct{}
|
||||
destroyLock sync.Mutex
|
||||
waitCh chan struct{}
|
||||
|
||||
snapshotLock sync.Mutex
|
||||
}
|
||||
|
||||
// taskRunnerState is used to snapshot the state of the task runner
|
||||
@@ -112,6 +114,8 @@ func (r *TaskRunner) RestoreState() error {
|
||||
|
||||
// SaveState is used to snapshot our state
|
||||
func (r *TaskRunner) SaveState() error {
|
||||
r.snapshotLock.Lock()
|
||||
defer r.snapshotLock.Unlock()
|
||||
snap := taskRunnerState{
|
||||
Task: r.task,
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user