client: test alloc runner

This commit is contained in:
Armon Dadgar
2015-08-30 17:10:17 -07:00
parent 65e79fb733
commit f6928545e8
2 changed files with 180 additions and 45 deletions

View File

@@ -159,56 +159,73 @@ func (r *AllocRunner) setAlloc(alloc *structs.Allocation) {
r.alloc = alloc
}
// syncStatus is used to run and sync the status when it changes
func (r *AllocRunner) syncStatus() {
var retryCh <-chan time.Time
// dirtySyncState is used to watch for state being marked dirty to sync
func (r *AllocRunner) dirtySyncState() {
for {
select {
case <-retryCh:
case <-r.dirtyCh:
r.retrySyncState(r.destroyCh)
case <-r.destroyCh:
return
}
// Scan the task status to termine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
pending = len(r.taskStatus) < len(r.tasks)
for _, status := range r.taskStatus {
switch status.Status {
case structs.AllocClientStatusRunning:
running = true
case structs.AllocClientStatusDead:
dead = true
case structs.AllocClientStatusFailed:
failed = true
}
}
if len(r.taskStatus) > 0 {
taskDesc, _ := json.Marshal(r.taskStatus)
r.alloc.ClientDescription = string(taskDesc)
}
r.taskStatusLock.RUnlock()
// Determine the alloc status
if failed {
r.alloc.ClientStatus = structs.AllocClientStatusFailed
} else if running {
r.alloc.ClientStatus = structs.AllocClientStatusRunning
} else if dead && !pending {
r.alloc.ClientStatus = structs.AllocClientStatusDead
}
// Attempt to update the status
if err := r.updater(r.alloc); err != nil {
r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s",
r.alloc.ID, r.alloc.ClientStatus, err)
retryCh = time.After(allocSyncRetryIntv + randomStagger(allocSyncRetryIntv))
}
retryCh = nil
}
}
// retrySyncState is used to retry the state sync until success
func (r *AllocRunner) retrySyncState(stopCh chan struct{}) {
for {
err := r.syncStatus()
if err == nil {
return
}
select {
case <-time.After(allocSyncRetryIntv + randomStagger(allocSyncRetryIntv)):
case <-stopCh:
return
}
}
}
// syncStatus is used to run and sync the status when it changes
func (r *AllocRunner) syncStatus() error {
// Scan the task status to termine the status of the alloc
var pending, running, dead, failed bool
r.taskStatusLock.RLock()
pending = len(r.taskStatus) < len(r.tasks)
for _, status := range r.taskStatus {
switch status.Status {
case structs.AllocClientStatusRunning:
running = true
case structs.AllocClientStatusDead:
dead = true
case structs.AllocClientStatusFailed:
failed = true
}
}
if len(r.taskStatus) > 0 {
taskDesc, _ := json.Marshal(r.taskStatus)
r.alloc.ClientDescription = string(taskDesc)
}
r.taskStatusLock.RUnlock()
// Determine the alloc status
if failed {
r.alloc.ClientStatus = structs.AllocClientStatusFailed
} else if running {
r.alloc.ClientStatus = structs.AllocClientStatusRunning
} else if dead && !pending {
r.alloc.ClientStatus = structs.AllocClientStatusDead
}
// Attempt to update the status
if err := r.updater(r.alloc); err != nil {
r.logger.Printf("[ERR] client: failed to update alloc '%s' status to %s: %s",
r.alloc.ID, r.alloc.ClientStatus, err)
return err
}
return nil
}
// setStatus is used to update the allocation status
func (r *AllocRunner) setStatus(status, desc string) {
r.alloc.ClientStatus = status
@@ -222,11 +239,11 @@ func (r *AllocRunner) setStatus(status, desc string) {
// setTaskStatus is used to set the status of a task
func (r *AllocRunner) setTaskStatus(taskName, status, desc string) {
r.taskStatusLock.Lock()
defer r.taskStatusLock.Unlock()
r.taskStatus[taskName] = taskStatus{
Status: status,
Description: desc,
}
r.taskStatusLock.Unlock()
select {
case r.dirtyCh <- struct{}{}:
default:
@@ -235,7 +252,7 @@ func (r *AllocRunner) setTaskStatus(taskName, status, desc string) {
// Run is a long running goroutine used to manage an allocation
func (r *AllocRunner) Run() {
go r.syncStatus()
go r.dirtySyncState()
// Check if the allocation is in a terminal status
alloc := r.alloc
@@ -307,6 +324,9 @@ OUTER:
<-tr.WaitCh()
}
// Final state sync
r.retrySyncState(nil)
// Check if we should destroy our state
if r.destroy {
r.DestroyState()

View File

@@ -2,9 +2,11 @@ package client
import (
"testing"
"time"
"github.com/hashicorp/nomad/nomad/mock"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/testutil"
)
type MockAllocStateUpdater struct {
@@ -30,13 +32,126 @@ func testAllocRunner() (*MockAllocStateUpdater, *AllocRunner) {
}
func TestAllocRunner_SimpleRun(t *testing.T) {
}
upd, ar := testAllocRunner()
go ar.Run()
defer ar.Destroy()
func TestAllocRunner_Update(t *testing.T) {
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
}, func(err error) {
t.Fatalf("err: %v", err)
})
}
func TestAllocRunner_Destroy(t *testing.T) {
upd, ar := testAllocRunner()
// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = "10"
go ar.Run()
start := time.Now()
// Begin the tear down
go func() {
time.Sleep(100 * time.Millisecond)
ar.Destroy()
}()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus)
})
if time.Since(start) > time.Second {
t.Fatalf("took too long to terminate")
}
}
func TestAllocRunner_Update(t *testing.T) {
upd, ar := testAllocRunner()
// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = "10"
go ar.Run()
defer ar.Destroy()
start := time.Now()
// Update the alloc definition
newAlloc := new(structs.Allocation)
*newAlloc = *ar.alloc
newAlloc.DesiredStatus = structs.AllocDesiredStatusStop
ar.Update(newAlloc)
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus)
})
if time.Since(start) > time.Second {
t.Fatalf("took too long to terminate")
}
}
func TestAllocRunner_SaveRestoreState(t *testing.T) {
upd, ar := testAllocRunner()
// Ensure task takes some time
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
task.Config["command"] = "/bin/sleep"
task.Config["args"] = "10"
go ar.Run()
defer ar.Destroy()
// Snapshot state
time.Sleep(200 * time.Millisecond)
err := ar.SaveState()
if err != nil {
t.Fatalf("err: %v", err)
}
// Create a new alloc runner
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
&structs.Allocation{ID: ar.alloc.ID})
err = ar2.RestoreState()
if err != nil {
t.Fatalf("err: %v", err)
}
go ar2.Run()
defer ar2.Destroy()
// Destroy and wait
ar2.Destroy()
start := time.Now()
testutil.WaitForResult(func() (bool, error) {
if upd.Count == 0 {
return false, nil
}
last := upd.Allocs[upd.Count-1]
return last.ClientStatus == structs.AllocClientStatusDead, nil
}, func(err error) {
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.taskStatus)
})
if time.Since(start) > time.Second {
t.Fatalf("took too long to terminate")
}
}