mirror of
https://github.com/kemko/nomad.git
synced 2026-01-05 01:45:44 +03:00
Merge pull request #755 from hashicorp/b-alloc-runner-destroy
Fix AllocRunner not capturing destroy signal and tests
This commit is contained in:
@@ -326,15 +326,8 @@ func (r *AllocRunner) Run() {
|
||||
defer close(r.waitCh)
|
||||
go r.dirtySyncState()
|
||||
|
||||
// Check if the allocation is in a terminal status
|
||||
alloc := r.alloc
|
||||
if alloc.TerminalStatus() {
|
||||
r.logger.Printf("[DEBUG] client: aborting runner for alloc '%s', terminal status", r.alloc.ID)
|
||||
return
|
||||
}
|
||||
r.logger.Printf("[DEBUG] client: starting runner for alloc '%s'", r.alloc.ID)
|
||||
|
||||
// Find the task group to run in the allocation
|
||||
alloc := r.alloc
|
||||
tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup)
|
||||
if tg == nil {
|
||||
r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup)
|
||||
@@ -353,7 +346,18 @@ func (r *AllocRunner) Run() {
|
||||
r.ctx = driver.NewExecContext(allocDir, r.alloc.ID)
|
||||
}
|
||||
|
||||
// Check if the allocation is in a terminal status. In this case, we don't
|
||||
// start any of the task runners and directly wait for the destroy signal to
|
||||
// clean up the allocation.
|
||||
if alloc.TerminalStatus() {
|
||||
r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.alloc.ID)
|
||||
r.handleDestroy()
|
||||
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
|
||||
return
|
||||
}
|
||||
|
||||
// Start the task runners
|
||||
r.logger.Printf("[DEBUG] client: starting task runners for alloc '%s'", r.alloc.ID)
|
||||
r.taskLock.Lock()
|
||||
for _, task := range tg.Tasks {
|
||||
if _, ok := r.restored[task.Name]; ok {
|
||||
@@ -415,7 +419,6 @@ OUTER:
|
||||
|
||||
// Destroy each sub-task
|
||||
r.taskLock.Lock()
|
||||
defer r.taskLock.Unlock()
|
||||
for _, tr := range r.tasks {
|
||||
tr.Destroy()
|
||||
}
|
||||
@@ -424,12 +427,21 @@ OUTER:
|
||||
for _, tr := range r.tasks {
|
||||
<-tr.WaitCh()
|
||||
}
|
||||
r.taskLock.Unlock()
|
||||
|
||||
// Final state sync
|
||||
r.retrySyncState(nil)
|
||||
|
||||
// Check if we should destroy our state
|
||||
if r.destroy {
|
||||
// Block until we should destroy the state of the alloc
|
||||
r.handleDestroy()
|
||||
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
|
||||
}
|
||||
|
||||
// handleDestroy blocks till the AllocRunner should be destroyed and does the
|
||||
// necessary cleanup.
|
||||
func (r *AllocRunner) handleDestroy() {
|
||||
select {
|
||||
case <-r.destroyCh:
|
||||
if err := r.DestroyContext(); err != nil {
|
||||
r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v",
|
||||
r.alloc.ID, err)
|
||||
@@ -439,7 +451,6 @@ OUTER:
|
||||
r.alloc.ID, err)
|
||||
}
|
||||
}
|
||||
r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID)
|
||||
}
|
||||
|
||||
// Update is used to update the allocation of the context
|
||||
|
||||
@@ -62,6 +62,95 @@ func TestAllocRunner_SimpleRun(t *testing.T) {
|
||||
})
|
||||
}
|
||||
|
||||
func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) {
|
||||
ctestutil.ExecCompatible(t)
|
||||
upd, ar := testAllocRunner(false)
|
||||
|
||||
// Ensure task takes some time
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Config["command"] = "/bin/sleep"
|
||||
task.Config["args"] = []string{"10"}
|
||||
go ar.Run()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if upd.Count == 0 {
|
||||
return false, fmt.Errorf("No updates")
|
||||
}
|
||||
last := upd.Allocs[upd.Count-1]
|
||||
if last.ClientStatus == structs.AllocClientStatusRunning {
|
||||
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
// Update the alloc to be terminal which should cause the alloc runner to
|
||||
// stop the tasks and wait for a destroy.
|
||||
update := ar.alloc.Copy()
|
||||
update.DesiredStatus = structs.AllocDesiredStatusStop
|
||||
ar.Update(update)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if upd.Count == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check the status has changed.
|
||||
last := upd.Allocs[upd.Count-1]
|
||||
if last.ClientStatus != structs.AllocClientStatusDead {
|
||||
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
|
||||
}
|
||||
|
||||
// Check the state still exists
|
||||
if _, err := os.Stat(ar.stateFilePath()); err != nil {
|
||||
return false, fmt.Errorf("state file destroyed: %v", err)
|
||||
}
|
||||
|
||||
// Check the alloc directory still exists
|
||||
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err != nil {
|
||||
return false, fmt.Errorf("alloc dir destroyed: %v", ar.ctx.AllocDir.AllocDir)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
// Send the destroy signal and ensure the AllocRunner cleans up.
|
||||
ar.Destroy()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if upd.Count == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check the status has changed.
|
||||
last := upd.Allocs[upd.Count-1]
|
||||
if last.ClientStatus != structs.AllocClientStatusDead {
|
||||
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
|
||||
}
|
||||
|
||||
// Check the state was cleaned
|
||||
if _, err := os.Stat(ar.stateFilePath()); err == nil {
|
||||
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
|
||||
} else if !os.IsNotExist(err) {
|
||||
return false, fmt.Errorf("stat err: %v", err)
|
||||
}
|
||||
|
||||
// Check the alloc directory was cleaned
|
||||
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil {
|
||||
return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir)
|
||||
} else if !os.IsNotExist(err) {
|
||||
return false, fmt.Errorf("stat err: %v", err)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestAllocRunner_Destroy(t *testing.T) {
|
||||
ctestutil.ExecCompatible(t)
|
||||
upd, ar := testAllocRunner(false)
|
||||
@@ -83,10 +172,30 @@ func TestAllocRunner_Destroy(t *testing.T) {
|
||||
if upd.Count == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check the status has changed.
|
||||
last := upd.Allocs[upd.Count-1]
|
||||
return last.ClientStatus == structs.AllocClientStatusDead, nil
|
||||
if last.ClientStatus != structs.AllocClientStatusDead {
|
||||
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
|
||||
}
|
||||
|
||||
// Check the state was cleaned
|
||||
if _, err := os.Stat(ar.stateFilePath()); err == nil {
|
||||
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
|
||||
} else if !os.IsNotExist(err) {
|
||||
return false, fmt.Errorf("stat err: %v", err)
|
||||
}
|
||||
|
||||
// Check the alloc directory was cleaned
|
||||
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil {
|
||||
return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir)
|
||||
} else if !os.IsNotExist(err) {
|
||||
return false, fmt.Errorf("stat err: %v", err)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
if time.Since(start) > 15*time.Second {
|
||||
@@ -129,7 +238,6 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
|
||||
task.Config["command"] = "/bin/sleep"
|
||||
task.Config["args"] = []string{"10"}
|
||||
go ar.Run()
|
||||
defer ar.Destroy()
|
||||
|
||||
// Snapshot state
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
@@ -171,3 +279,106 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) {
|
||||
t.Fatalf("took too long to terminate")
|
||||
}
|
||||
}
|
||||
|
||||
func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) {
|
||||
ctestutil.ExecCompatible(t)
|
||||
upd, ar := testAllocRunner(false)
|
||||
|
||||
// Ensure task takes some time
|
||||
task := ar.alloc.Job.TaskGroups[0].Tasks[0]
|
||||
task.Config["command"] = "/bin/sleep"
|
||||
task.Config["args"] = []string{"10"}
|
||||
go ar.Run()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if upd.Count == 0 {
|
||||
return false, fmt.Errorf("No updates")
|
||||
}
|
||||
last := upd.Allocs[upd.Count-1]
|
||||
if last.ClientStatus == structs.AllocClientStatusRunning {
|
||||
return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
// Update the alloc to be terminal which should cause the alloc runner to
|
||||
// stop the tasks and wait for a destroy.
|
||||
update := ar.alloc.Copy()
|
||||
update.DesiredStatus = structs.AllocDesiredStatusStop
|
||||
ar.Update(update)
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
return ar.alloc.DesiredStatus == structs.AllocDesiredStatusStop, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
|
||||
err := ar.SaveState()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
// Ensure both alloc runners don't destroy
|
||||
ar.destroy = true
|
||||
|
||||
// Create a new alloc runner
|
||||
consulClient, err := NewConsulService(&consulServiceConfig{ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}})
|
||||
ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update,
|
||||
&structs.Allocation{ID: ar.alloc.ID}, consulClient)
|
||||
err = ar2.RestoreState()
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
go ar2.Run()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
// Check the state still exists
|
||||
if _, err := os.Stat(ar.stateFilePath()); err != nil {
|
||||
return false, fmt.Errorf("state file destroyed: %v", err)
|
||||
}
|
||||
|
||||
// Check the alloc directory still exists
|
||||
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err != nil {
|
||||
return false, fmt.Errorf("alloc dir destroyed: %v", ar.ctx.AllocDir.AllocDir)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates)
|
||||
})
|
||||
|
||||
// Send the destroy signal and ensure the AllocRunner cleans up.
|
||||
ar2.Destroy()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
if upd.Count == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// Check the status has changed.
|
||||
last := upd.Allocs[upd.Count-1]
|
||||
if last.ClientStatus != structs.AllocClientStatusDead {
|
||||
return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead)
|
||||
}
|
||||
|
||||
// Check the state was cleaned
|
||||
if _, err := os.Stat(ar.stateFilePath()); err == nil {
|
||||
return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath())
|
||||
} else if !os.IsNotExist(err) {
|
||||
return false, fmt.Errorf("stat err: %v", err)
|
||||
}
|
||||
|
||||
// Check the alloc directory was cleaned
|
||||
if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil {
|
||||
return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir)
|
||||
} else if !os.IsNotExist(err) {
|
||||
return false, fmt.Errorf("stat err: %v", err)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %v", err)
|
||||
})
|
||||
}
|
||||
|
||||
@@ -36,7 +36,7 @@ type AllocDir struct {
|
||||
TaskDirs map[string]string
|
||||
|
||||
// A list of locations the shared alloc has been mounted to.
|
||||
mounted []string
|
||||
Mounted []string
|
||||
}
|
||||
|
||||
// AllocFileInfo holds information about a file inside the AllocDir
|
||||
@@ -64,7 +64,7 @@ func NewAllocDir(allocDir string) *AllocDir {
|
||||
// Tears down previously build directory structure.
|
||||
func (d *AllocDir) Destroy() error {
|
||||
// Unmount all mounted shared alloc dirs.
|
||||
for _, m := range d.mounted {
|
||||
for _, m := range d.Mounted {
|
||||
if err := d.unmountSharedDir(m); err != nil {
|
||||
return fmt.Errorf("Failed to unmount shared directory: %v", err)
|
||||
}
|
||||
@@ -233,7 +233,7 @@ func (d *AllocDir) MountSharedDir(task string) error {
|
||||
return fmt.Errorf("Failed to mount shared directory for task %v: %v", task, err)
|
||||
}
|
||||
|
||||
d.mounted = append(d.mounted, taskLoc)
|
||||
d.Mounted = append(d.Mounted, taskLoc)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,7 @@ type testFn func() (bool, error)
|
||||
type errorFn func(error)
|
||||
|
||||
func WaitForResult(test testFn, error errorFn) {
|
||||
WaitForResultRetries(1000*TestMultiplier(), test, error)
|
||||
WaitForResultRetries(2000*TestMultiplier(), test, error)
|
||||
}
|
||||
|
||||
func WaitForResultRetries(retries int64, test testFn, error errorFn) {
|
||||
|
||||
Reference in New Issue
Block a user