From c89e97084d0517aa4e045b8e52a4d71b11c4a38b Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 4 Feb 2016 13:09:53 -0800 Subject: [PATCH] Fix AllocRunner not capturing destroy signal and tests --- client/alloc_runner.go | 32 ++++-- client/alloc_runner_test.go | 212 +++++++++++++++++++++++++++++++++++- 2 files changed, 231 insertions(+), 13 deletions(-) diff --git a/client/alloc_runner.go b/client/alloc_runner.go index 584a031e1..b32dc9299 100644 --- a/client/alloc_runner.go +++ b/client/alloc_runner.go @@ -326,15 +326,8 @@ func (r *AllocRunner) Run() { defer close(r.waitCh) go r.dirtySyncState() - // Check if the allocation is in a terminal status - alloc := r.alloc - if alloc.TerminalStatus() { - r.logger.Printf("[DEBUG] client: aborting runner for alloc '%s', terminal status", r.alloc.ID) - return - } - r.logger.Printf("[DEBUG] client: starting runner for alloc '%s'", r.alloc.ID) - // Find the task group to run in the allocation + alloc := r.alloc tg := alloc.Job.LookupTaskGroup(alloc.TaskGroup) if tg == nil { r.logger.Printf("[ERR] client: alloc '%s' for missing task group '%s'", alloc.ID, alloc.TaskGroup) @@ -353,7 +346,17 @@ func (r *AllocRunner) Run() { r.ctx = driver.NewExecContext(allocDir, r.alloc.ID) } + // Check if the allocation is in a terminal status. In this case, we don't + // start any of the task runners and directly wait for the destroy signal to + // clean up the allocation. + if alloc.TerminalStatus() { + r.logger.Printf("[DEBUG] client: alloc %q in terminal status, waiting for destroy", r.alloc.ID) + r.handleDestroy() + r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID) + } + // Start the task runners + r.logger.Printf("[DEBUG] client: starting runner for alloc '%s'", r.alloc.ID) r.taskLock.Lock() for _, task := range tg.Tasks { if _, ok := r.restored[task.Name]; ok { @@ -428,8 +431,16 @@ OUTER: // Final state sync r.retrySyncState(nil) - // Check if we should destroy our state - if r.destroy { + // Block until we should destroy the state of the alloc + r.handleDestroy() + r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID) +} + +// handleDestroy blocks till the AllocRunner should be destroyed and does the +// necessary cleanup. +func (r *AllocRunner) handleDestroy() { + select { + case <-r.destroyCh: if err := r.DestroyContext(); err != nil { r.logger.Printf("[ERR] client: failed to destroy context for alloc '%s': %v", r.alloc.ID, err) @@ -439,7 +450,6 @@ OUTER: r.alloc.ID, err) } } - r.logger.Printf("[DEBUG] client: terminating runner for alloc '%s'", r.alloc.ID) } // Update is used to update the allocation of the context diff --git a/client/alloc_runner_test.go b/client/alloc_runner_test.go index 5d68e013c..a97544d41 100644 --- a/client/alloc_runner_test.go +++ b/client/alloc_runner_test.go @@ -62,6 +62,95 @@ func TestAllocRunner_SimpleRun(t *testing.T) { }) } +func TestAllocRunner_TerminalUpdate_Destroy(t *testing.T) { + ctestutil.ExecCompatible(t) + upd, ar := testAllocRunner(false) + + // Ensure task takes some time + task := ar.alloc.Job.TaskGroups[0].Tasks[0] + task.Config["command"] = "/bin/sleep" + task.Config["args"] = []string{"10"} + go ar.Run() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, fmt.Errorf("No updates") + } + last := upd.Allocs[upd.Count-1] + if last.ClientStatus == structs.AllocClientStatusRunning { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Update the alloc to be terminal which should cause the alloc runner to + // stop the tasks and wait for a destroy. + update := ar.alloc.Copy() + update.DesiredStatus = structs.AllocDesiredStatusStop + ar.Update(update) + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, nil + } + + // Check the status has changed. + last := upd.Allocs[upd.Count-1] + if last.ClientStatus != structs.AllocClientStatusDead { + return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead) + } + + // Check the state still exists + if _, err := os.Stat(ar.stateFilePath()); err != nil { + return false, fmt.Errorf("state file destroyed: %v", err) + } + + // Check the alloc directory still exists + if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err != nil { + return false, fmt.Errorf("alloc dir destroyed: %v", ar.ctx.AllocDir.AllocDir) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) + }) + + // Send the destroy signal and ensure the AllocRunner cleans up. + ar.Destroy() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, nil + } + + // Check the status has changed. + last := upd.Allocs[upd.Count-1] + if last.ClientStatus != structs.AllocClientStatusDead { + return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead) + } + + // Check the state was cleaned + if _, err := os.Stat(ar.stateFilePath()); err == nil { + return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath()) + } else if !os.IsNotExist(err) { + return false, fmt.Errorf("stat err: %v", err) + } + + // Check the alloc directory was cleaned + if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil { + return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir) + } else if !os.IsNotExist(err) { + return false, fmt.Errorf("stat err: %v", err) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) + }) +} + func TestAllocRunner_Destroy(t *testing.T) { ctestutil.ExecCompatible(t) upd, ar := testAllocRunner(false) @@ -83,8 +172,28 @@ func TestAllocRunner_Destroy(t *testing.T) { if upd.Count == 0 { return false, nil } + + // Check the status has changed. last := upd.Allocs[upd.Count-1] - return last.ClientStatus == structs.AllocClientStatusDead, nil + if last.ClientStatus != structs.AllocClientStatusDead { + return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead) + } + + // Check the state was cleaned + if _, err := os.Stat(ar.stateFilePath()); err == nil { + return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath()) + } else if !os.IsNotExist(err) { + return false, fmt.Errorf("stat err: %v", err) + } + + // Check the alloc directory was cleaned + if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil { + return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir) + } else if !os.IsNotExist(err) { + return false, fmt.Errorf("stat err: %v", err) + } + + return true, nil }, func(err error) { t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) }) @@ -129,7 +238,6 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { task.Config["command"] = "/bin/sleep" task.Config["args"] = []string{"10"} go ar.Run() - defer ar.Destroy() // Snapshot state testutil.WaitForResult(func() (bool, error) { @@ -171,3 +279,103 @@ func TestAllocRunner_SaveRestoreState(t *testing.T) { t.Fatalf("took too long to terminate") } } + +func TestAllocRunner_SaveRestoreState_TerminalAlloc(t *testing.T) { + ctestutil.ExecCompatible(t) + upd, ar := testAllocRunner(false) + + // Ensure task takes some time + task := ar.alloc.Job.TaskGroups[0].Tasks[0] + task.Config["command"] = "/bin/sleep" + task.Config["args"] = []string{"10"} + go ar.Run() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, fmt.Errorf("No updates") + } + last := upd.Allocs[upd.Count-1] + if last.ClientStatus == structs.AllocClientStatusRunning { + return false, fmt.Errorf("got status %v; want %v", last.ClientStatus, structs.AllocClientStatusRunning) + } + return true, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + // Update the alloc to be terminal which should cause the alloc runner to + // stop the tasks and wait for a destroy. + update := ar.alloc.Copy() + update.DesiredStatus = structs.AllocDesiredStatusStop + ar.Update(update) + + testutil.WaitForResult(func() (bool, error) { + return ar.alloc.DesiredStatus == structs.AllocDesiredStatusStop, nil + }, func(err error) { + t.Fatalf("err: %v", err) + }) + + err := ar.SaveState() + if err != nil { + t.Fatalf("err: %v", err) + } + + // Create a new alloc runner + consulClient, err := NewConsulService(&consulServiceConfig{ar.logger, "127.0.0.1:8500", "", "", false, false, &structs.Node{}}) + ar2 := NewAllocRunner(ar.logger, ar.config, upd.Update, + &structs.Allocation{ID: ar.alloc.ID}, consulClient) + err = ar2.RestoreState() + if err != nil { + t.Fatalf("err: %v", err) + } + go ar2.Run() + + testutil.WaitForResult(func() (bool, error) { + // Check the state still exists + if _, err := os.Stat(ar.stateFilePath()); err != nil { + return false, fmt.Errorf("state file destroyed: %v", err) + } + + // Check the alloc directory still exists + if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err != nil { + return false, fmt.Errorf("alloc dir destroyed: %v", ar.ctx.AllocDir.AllocDir) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) + }) + + // Send the destroy signal and ensure the AllocRunner cleans up. + ar2.Destroy() + + testutil.WaitForResult(func() (bool, error) { + if upd.Count == 0 { + return false, nil + } + + // Check the status has changed. + last := upd.Allocs[upd.Count-1] + if last.ClientStatus != structs.AllocClientStatusDead { + return false, fmt.Errorf("got client status %v; want %v", last.ClientStatus, structs.AllocClientStatusDead) + } + + // Check the state was cleaned + if _, err := os.Stat(ar.stateFilePath()); err == nil { + return false, fmt.Errorf("state file still exists: %v", ar.stateFilePath()) + } else if !os.IsNotExist(err) { + return false, fmt.Errorf("stat err: %v", err) + } + + // Check the alloc directory was cleaned + if _, err := os.Stat(ar.ctx.AllocDir.AllocDir); err == nil { + return false, fmt.Errorf("alloc dir still exists: %v", ar.ctx.AllocDir.AllocDir) + } else if !os.IsNotExist(err) { + return false, fmt.Errorf("stat err: %v", err) + } + + return true, nil + }, func(err error) { + t.Fatalf("err: %v %#v %#v", err, upd.Allocs[0], ar.alloc.TaskStates) + }) +}