diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 69734df50..bc1c7240c 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -159,56 +159,73 @@ func (r *AllocRunner) setAlloc(alloc *structs.Allocation) { r.alloc = alloc } -// syncStatus is used to run and sync the status when it changes -func (r *AllocRunner) syncStatus() { - var retryCh <-chan time.Time +// dirtySyncState is used to watch for state being marked dirty to sync +func (r *AllocRunner) dirtySyncState() { for { select { - case <-retryCh: case <-r.dirtyCh: + r.retrySyncState(r.destroyCh) case <-r.destroyCh: return } - - // Scan the task status to termine the status of the alloc - var pending, running, dead, failed bool - r.taskStatusLock.RLock() - pending = len(r.taskStatus) < len(r.tasks) - for _, status := range r.taskStatus { - switch status.Status { - case structs.AllocClientStatusRunning: - running = true - case structs.AllocClientStatusDead: - dead = true - case structs.AllocClientStatusFailed: - failed = true - } - } - if len(r.taskStatus) > 0 { - taskDesc, _ := json.Marshal(r.taskStatus) - r.alloc.ClientDescription = string(taskDesc) - } - r.taskStatusLock.RUnlock() - - // Determine the alloc status - if failed { - r.alloc.ClientStatus = structs.AllocClientStatusFailed - } else if running { - r.alloc.ClientStatus = structs.AllocClientStatusRunning - } else if dead && !pending { - r.alloc.ClientStatus = structs.AllocClientStatusDead - } - - // Attempt to update the status - if err := r.updater(r.alloc); err != nil { - r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s", - r.alloc.ID, r.alloc.ClientStatus, err) - retryCh = time.After(allocSyncRetryIntv + randomStagger(allocSyncRetryIntv)) - } - retryCh = nil } } +// retrySyncState is used to retry the state sync until success +func (r *AllocRunner) retrySyncState(stopCh chan struct{}) { + for { + err := r.syncStatus() + if err == nil { + return + } + select { + case <-time.After(allocSyncRetryIntv + randomStagger(allocSyncRetryIntv)): + case <-stopCh: + return + } + } +} + +// syncStatus is used to run and sync the status when it changes +func (r *AllocRunner) syncStatus() error { + // Scan the task status to termine the status of the alloc + var pending, running, dead, failed bool + r.taskStatusLock.RLock() + pending = len(r.taskStatus) < len(r.tasks) + for _, status := range r.taskStatus { + switch status.Status { + case structs.AllocClientStatusRunning: + running = true + case structs.AllocClientStatusDead: + dead = true + case structs.AllocClientStatusFailed: + failed = true + } + } + if len(r.taskStatus) > 0 { + taskDesc, _ := json.Marshal(r.taskStatus) + r.alloc.ClientDescription = string(taskDesc) + } + r.taskStatusLock.RUnlock() + + // Determine the alloc status + if failed { + r.alloc.ClientStatus = structs.AllocClientStatusFailed + } else if running { + r.alloc.ClientStatus = structs.AllocClientStatusRunning + } else if dead && !pending { + r.alloc.ClientStatus = structs.AllocClientStatusDead + } + + // Attempt to update the status + if err := r.updater(r.alloc); err != nil { + r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s", + r.alloc.ID, r.alloc.ClientStatus, err) + return err + } + return nil +} + // setStatus is used to update the allocation status func (r *AllocRunner) setStatus(status, desc string) { r.alloc.ClientStatus = status @@ -222,11 +239,11 @@ func (r *AllocRunner) setStatus(status, desc string) { // setTaskStatus is used to set the status of a task func (r *AllocRunner) setTaskStatus(taskName, status, desc string) { r.taskStatusLock.Lock() - defer r.taskStatusLock.Unlock() r.taskStatus[taskName] = taskStatus{ Status: status, Description: desc, } + r.taskStatusLock.Unlock() select { case r.dirtyCh <- struct{}{}: default: @@ -235,7 +252,7 @@ func (r *AllocRunner) setTaskStatus(taskName, status, desc string) { // Run is a long running goroutine used to manage an allocation func (r *AllocRunner) Run() { - go r.syncStatus() + go r.dirtySyncState() // Check if the allocation is in a terminal status alloc := r.alloc @@ -307,6 +324,9 @@ OUTER: <-tr.WaitCh() } + // Final state sync + r.retrySyncState(nil) + // Check if we should destroy our state if r.destroy { r.DestroyState() diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index bd605c3b1..16621b0a5 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -2,9 +2,11 @@ package client import ( "testing" + "time" "github.com/hashicorp/nomad/nomad/mock" "github.com/hashicorp/nomad/nomad/structs" + "github.com/hashicorp/nomad/testutil" ) type MockAllocStateUpdater struct { @@ -30,13 +32,126 @@ func testAllocRunner() (*MockAllocStateUpdater, *AllocRunner) { } func TestAllocRunner_SimpleRun(t *testing.T) { -} + upd, ar := testAllocRunner() + go ar.Run() + defer ar.Destroy() -func TestAllocRunner_Update(t *testing.T) { + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, nil + } + last := upd.Allocs[upd.Count-1] + return last.ClientStatus == structs.AllocClientStatusDead, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) } func TestAllocRunner_Destroy(t *testing.T) { + upd, ar := testAllocRunner() + + // Ensure task takes some time + task := ar.alloc.Job.TaskGroups[0].Tasks[0] + task.Config["command"] = "/bin/sleep" + task.Config["args"] = "10" + go ar.Run() + start := time.Now() + + // Begin the tear down + go func() { + time.Sleep(100 * time.Millisecond) + ar.Destroy() + }() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, nil + } + last := upd.Allocs[upd.Count-1] + return last.ClientStatus == structs.AllocClientStatusDead, nil + }, func(err error) { + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus) + }) + + if time.Since(start) > time.Second { + t.Fatalf("took too long to terminate") + } +} + +func TestAllocRunner_Update(t *testing.T) { + upd, ar := testAllocRunner() + + // Ensure task takes some time + task := ar.alloc.Job.TaskGroups[0].Tasks[0] + task.Config["command"] = "/bin/sleep" + task.Config["args"] = "10" + go ar.Run() + defer ar.Destroy() + start := time.Now() + + // Update the alloc definition + newAlloc := new(structs.Allocation) + *newAlloc = *ar.alloc + newAlloc.DesiredStatus = structs.AllocDesiredStatusStop + ar.Update(newAlloc) + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, nil + } + last := upd.Allocs[upd.Count-1] + return last.ClientStatus == structs.AllocClientStatusDead, nil + }, func(err error) { + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus) + }) + + if time.Since(start) > time.Second { + t.Fatalf("took too long to terminate") + } } func TestAllocRunner_SaveRestoreState(t *testing.T) { + upd, ar := testAllocRunner() + + // Ensure task takes some time + task := ar.alloc.Job.TaskGroups[0].Tasks[0] + task.Config["command"] = "/bin/sleep" + task.Config["args"] = "10" + go ar.Run() + defer ar.Destroy() + + // Snapshot state + time.Sleep(200 * time.Millisecond) + err := ar.SaveState() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create a new alloc runner + ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, + &structs.Allocation{ID: ar.alloc.ID}) + err = ar2.RestoreState() + if err != nil { + t.Fatalf("err: %v", err) + } + go ar2.Run() + defer ar2.Destroy() + + // Destroy and wait + ar2.Destroy() + start := time.Now() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, nil + } + last := upd.Allocs[upd.Count-1] + return last.ClientStatus == structs.AllocClientStatusDead, nil + }, func(err error) { + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus) + }) + + if time.Since(start) > time.Second { + t.Fatalf("took too long to terminate") + } }