From 1d21991e280ffd97e166af2a5922b7e57286abac Mon Sep 17 00:00:00 2001 From: Alex Dadgar Date: Thu, 5 Nov 2015 09:58:57 -0800 Subject: [PATCH] exec_basic uses Spawner; create shared test harness for executors --- client/driver/executor/exec.go | 12 +- client/driver/executor/exec_basic.go | 84 ++++++-- client/driver/executor/exec_basic_test.go | 7 + client/driver/executor/exec_linux.go | 4 + client/driver/executor/exec_linux_test.go | 210 +------------------- client/driver/executor/test_harness.go | 231 ++++++++++++++++++++++ client/driver/spawn/spawn.go | 2 + 7 files changed, 317 insertions(+), 233 deletions(-) create mode 100644 client/driver/executor/exec_basic_test.go create mode 100644 client/driver/executor/test_harness.go diff --git a/client/driver/executor/exec.go b/client/driver/executor/exec.go index b3878ba0c..8cf076bab 100644 --- a/client/driver/executor/exec.go +++ b/client/driver/executor/exec.go @@ -74,11 +74,16 @@ type Executor interface { } // Command is a mirror of exec.Command that returns a platform-specific Executor -func Command(name string, arg ...string) Executor { +func Command(name string, args ...string) Executor { executor := NewExecutor() - cmd := executor.Command() + SetCommand(executor, name, args) + return executor +} + +func SetCommand(e Executor, name string, args []string) { + cmd := e.Command() cmd.Path = name - cmd.Args = append([]string{name}, arg...) + cmd.Args = append([]string{name}, args...) if filepath.Base(name) == name { if lp, err := exec.LookPath(name); err != nil { @@ -87,7 +92,6 @@ func Command(name string, arg ...string) Executor { cmd.Path = lp } } - return executor } // OpenId is similar to executor.Command but will attempt to reopen with the diff --git a/client/driver/executor/exec_basic.go b/client/driver/executor/exec_basic.go index b9bdebd7d..4b865fbd9 100644 --- a/client/driver/executor/exec_basic.go +++ b/client/driver/executor/exec_basic.go @@ -1,22 +1,29 @@ package executor import ( + "bytes" + "encoding/json" "fmt" "os" "os/exec" - "strconv" + "path/filepath" "strings" "github.com/hashicorp/nomad/client/allocdir" "github.com/hashicorp/nomad/client/driver/args" "github.com/hashicorp/nomad/client/driver/environment" + "github.com/hashicorp/nomad/client/driver/spawn" "github.com/hashicorp/nomad/nomad/structs" ) // BasicExecutor should work everywhere, and as a result does not include // any resource restrictions or runas capabilities. type BasicExecutor struct { - cmd exec.Cmd + cmd exec.Cmd + spawn *spawn.Spawner + taskName string + taskDir string + allocDir string } // TODO: Have raw_exec use this as well. @@ -34,9 +41,13 @@ func (e *BasicExecutor) Limit(resources *structs.Resources) error { func (e *BasicExecutor) ConfigureTaskDir(taskName string, alloc *allocdir.AllocDir) error { taskDir, ok := alloc.TaskDirs[taskName] if !ok { - return fmt.Errorf("Error finding task dir for (%s)", taskName) + fmt.Errorf("Couldn't find task directory for task %v", taskName) } e.cmd.Dir = taskDir + + e.taskDir = taskDir + e.taskName = taskName + e.allocDir = alloc.AllocDir return nil } @@ -64,42 +75,73 @@ func (e *BasicExecutor) Start() error { e.cmd.Args = parsed // We don't want to call ourself. We want to call Start on our embedded Cmd - return e.cmd.Start() + spawnState := filepath.Join(e.allocDir, fmt.Sprintf("%s_%s", e.taskName, "exit_status")) + e.spawn = spawn.NewSpawner(spawnState) + e.spawn.SetCommand(&e.cmd) + e.spawn.SetLogs(&spawn.Logs{ + Stdout: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stdout", e.taskName)), + Stderr: filepath.Join(e.taskDir, allocdir.TaskLocal, fmt.Sprintf("%v.stderr", e.taskName)), + Stdin: os.DevNull, + }) + + return e.spawn.Spawn(nil) } -func (e *BasicExecutor) Open(pid string) error { - pidNum, err := strconv.Atoi(pid) - if err != nil { - return fmt.Errorf("Failed to parse pid %v: %v", pid, err) +func (e *BasicExecutor) Open(id string) error { + var spawn spawn.Spawner + dec := json.NewDecoder(strings.NewReader(id)) + if err := dec.Decode(&spawn); err != nil { + return fmt.Errorf("Failed to parse id: %v", err) } - process, err := os.FindProcess(pidNum) - if err != nil { - return fmt.Errorf("Failed to reopen pid %d: %v", pidNum, err) - } - e.cmd.Process = process + // Setup the executor. + e.spawn = &spawn return nil } func (e *BasicExecutor) Wait() error { - // We don't want to call ourself. We want to call Start on our embedded Cmd - return e.cmd.Wait() + code, err := e.spawn.Wait() + if err != nil { + return err + } + + if code != 0 { + return fmt.Errorf("Task exited with code: %d", code) + } + + return nil } func (e *BasicExecutor) ID() (string, error) { - if e.cmd.Process != nil { - return strconv.Itoa(e.cmd.Process.Pid), nil - } else { - return "", fmt.Errorf("Process has finished or was never started") + if e.spawn == nil { + return "", fmt.Errorf("Process was never started") } + + var buffer bytes.Buffer + enc := json.NewEncoder(&buffer) + if err := enc.Encode(e.spawn); err != nil { + return "", fmt.Errorf("Failed to serialize id: %v", err) + } + + return buffer.String(), nil } func (e *BasicExecutor) Shutdown() error { - return e.ForceStop() + proc, err := os.FindProcess(e.spawn.UserPid) + if err != nil { + return fmt.Errorf("Failed to find user processes %v: %v", e.spawn.UserPid, err) + } + + return proc.Signal(os.Interrupt) } func (e *BasicExecutor) ForceStop() error { - return e.cmd.Process.Kill() + proc, err := os.FindProcess(e.spawn.UserPid) + if err != nil { + return fmt.Errorf("Failed to find user processes %v: %v", e.spawn.UserPid, err) + } + + return proc.Kill() } func (e *BasicExecutor) Command() *exec.Cmd { diff --git a/client/driver/executor/exec_basic_test.go b/client/driver/executor/exec_basic_test.go new file mode 100644 index 000000000..d9eed49f9 --- /dev/null +++ b/client/driver/executor/exec_basic_test.go @@ -0,0 +1,7 @@ +package executor + +import "testing" + +func TestExecutorBasic(t *testing.T) { + testExecutor(t, NewBasicExecutor, nil) +} diff --git a/client/driver/executor/exec_linux.go b/client/driver/executor/exec_linux.go index a75440ba8..cc428ecd7 100644 --- a/client/driver/executor/exec_linux.go +++ b/client/driver/executor/exec_linux.go @@ -41,6 +41,10 @@ var ( ) func NewExecutor() Executor { + return NewLinuxExecutor() +} + +func NewLinuxExecutor() Executor { return &LinuxExecutor{} } diff --git a/client/driver/executor/exec_linux_test.go b/client/driver/executor/exec_linux_test.go index 1b8307b02..c0bd2087a 100644 --- a/client/driver/executor/exec_linux_test.go +++ b/client/driver/executor/exec_linux_test.go @@ -1,217 +1,11 @@ package executor import ( - "fmt" - "io/ioutil" - "os" - "path/filepath" "testing" - "time" - - "github.com/hashicorp/nomad/client/allocdir" - "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/structs" ctestutil "github.com/hashicorp/nomad/client/testutil" ) -var ( - constraint = &structs.Resources{ - CPU: 250, - MemoryMB: 256, - Networks: []*structs.NetworkResource{ - &structs.NetworkResource{ - MBits: 50, - DynamicPorts: []string{"http"}, - }, - }, - } -) - -func mockAllocDir(t *testing.T) (string, *allocdir.AllocDir) { - alloc := mock.Alloc() - task := alloc.Job.TaskGroups[0].Tasks[0] - - allocDir := allocdir.NewAllocDir(filepath.Join(os.TempDir(), alloc.ID)) - if err := allocDir.Build([]*structs.Task{task}); err != nil { - t.Fatalf("allocDir.Build() failed: %v", err) - } - - return task.Name, allocDir -} - -func TestExecutorLinux_Start_Invalid(t *testing.T) { - ctestutil.ExecCompatible(t) - invalid := "/bin/foobar" - e := Command(invalid, "1") - - if err := e.Limit(constraint); err != nil { - t.Fatalf("Limit() failed: %v", err) - } - - task, alloc := mockAllocDir(t) - defer alloc.Destroy() - if err := e.ConfigureTaskDir(task, alloc); err != nil { - t.Fatalf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) - } - - if err := e.Start(); err == nil { - t.Fatalf("Start(%v) should have failed", invalid) - } -} - -func TestExecutorLinux_Start_Wait_Failure_Code(t *testing.T) { - ctestutil.ExecCompatible(t) - e := Command("/bin/date", "-invalid") - - if err := e.Limit(constraint); err != nil { - t.Fatalf("Limit() failed: %v", err) - } - - task, alloc := mockAllocDir(t) - defer alloc.Destroy() - if err := e.ConfigureTaskDir(task, alloc); err != nil { - t.Fatalf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) - } - - if err := e.Start(); err != nil { - t.Fatalf("Start() failed: %v", err) - } - - if err := e.Wait(); err == nil { - t.Fatalf("Wait() should have failed") - } -} - -func TestExecutorLinux_Start_Wait(t *testing.T) { - ctestutil.ExecCompatible(t) - task, alloc := mockAllocDir(t) - defer alloc.Destroy() - - taskDir, ok := alloc.TaskDirs[task] - if !ok { - t.Fatalf("No task directory found for task %v", task) - } - - expected := "hello world" - file := filepath.Join(allocdir.TaskLocal, "output.txt") - absFilePath := filepath.Join(taskDir, file) - cmd := fmt.Sprintf(`"%v \"%v\" > %v"`, "/bin/sleep 1 ; echo -n", expected, file) - e := Command("/bin/bash", "-c", cmd) - - if err := e.Limit(constraint); err != nil { - t.Fatalf("Limit() failed: %v", err) - } - - if err := e.ConfigureTaskDir(task, alloc); err != nil { - t.Fatalf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) - } - - if err := e.Start(); err != nil { - t.Fatalf("Start() failed: %v", err) - } - - if err := e.Wait(); err != nil { - t.Fatalf("Wait() failed: %v", err) - } - - output, err := ioutil.ReadFile(absFilePath) - if err != nil { - t.Fatalf("Couldn't read file %v", absFilePath) - } - - act := string(output) - if act != expected { - t.Fatalf("Command output incorrectly: want %v; got %v", expected, act) - } -} - -func TestExecutorLinux_Start_Kill(t *testing.T) { - ctestutil.ExecCompatible(t) - task, alloc := mockAllocDir(t) - defer alloc.Destroy() - - taskDir, ok := alloc.TaskDirs[task] - if !ok { - t.Fatalf("No task directory found for task %v", task) - } - - filePath := filepath.Join(taskDir, "output") - e := Command("/bin/bash", "-c", "sleep 1 ; echo \"failure\" > "+filePath) - - if err := e.Limit(constraint); err != nil { - t.Fatalf("Limit() failed: %v", err) - } - - if err := e.ConfigureTaskDir(task, alloc); err != nil { - t.Fatalf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) - } - - if err := e.Start(); err != nil { - t.Fatalf("Start() failed: %v", err) - } - - if err := e.Shutdown(); err != nil { - t.Fatalf("Shutdown() failed: %v", err) - } - - time.Sleep(1500 * time.Millisecond) - - // Check that the file doesn't exist. - if _, err := os.Stat(filePath); err == nil { - t.Fatalf("Stat(%v) should have failed: task not killed", filePath) - } -} - -func TestExecutorLinux_Open(t *testing.T) { - ctestutil.ExecCompatible(t) - task, alloc := mockAllocDir(t) - defer alloc.Destroy() - - taskDir, ok := alloc.TaskDirs[task] - if !ok { - t.Fatalf("No task directory found for task %v", task) - } - - expected := "hello world" - file := filepath.Join(allocdir.TaskLocal, "output.txt") - absFilePath := filepath.Join(taskDir, file) - cmd := fmt.Sprintf(`"%v \"%v\" > %v"`, "/bin/sleep 1 ; echo -n", expected, file) - e := Command("/bin/bash", "-c", cmd) - - if err := e.Limit(constraint); err != nil { - t.Fatalf("Limit() failed: %v", err) - } - - if err := e.ConfigureTaskDir(task, alloc); err != nil { - t.Fatalf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) - } - - if err := e.Start(); err != nil { - t.Fatalf("Start() failed: %v", err) - } - - id, err := e.ID() - if err != nil { - t.Fatalf("ID() failed: %v", err) - } - - e2 := NewExecutor() - if err := e2.Open(id); err != nil { - t.Fatalf("Open(%v) failed: %v", id, err) - } - - if err := e2.Wait(); err != nil { - t.Fatalf("Wait() failed: %v", err) - } - - output, err := ioutil.ReadFile(absFilePath) - if err != nil { - t.Fatalf("Couldn't read file %v", absFilePath) - } - - act := string(output) - if act != expected { - t.Fatalf("Command output incorrectly: want %v; got %v", expected, act) - } +func TestExecutorLinux(t *testing.T) { + testExecutor(t, NewLinuxExecutor, ctestutil.ExecCompatible) } diff --git a/client/driver/executor/test_harness.go b/client/driver/executor/test_harness.go new file mode 100644 index 000000000..afdf8610f --- /dev/null +++ b/client/driver/executor/test_harness.go @@ -0,0 +1,231 @@ +package executor + +import ( + "fmt" + "io/ioutil" + "log" + "os" + "path/filepath" + "testing" + "time" + + "github.com/hashicorp/nomad/client/allocdir" + "github.com/hashicorp/nomad/nomad/mock" + "github.com/hashicorp/nomad/nomad/structs" +) + +var ( + constraint = &structs.Resources{ + CPU: 250, + MemoryMB: 256, + Networks: []*structs.NetworkResource{ + &structs.NetworkResource{ + MBits: 50, + DynamicPorts: []string{"http"}, + }, + }, + } +) + +func mockAllocDir(t *testing.T) (string, *allocdir.AllocDir) { + alloc := mock.Alloc() + task := alloc.Job.TaskGroups[0].Tasks[0] + + allocDir := allocdir.NewAllocDir(filepath.Join(os.TempDir(), alloc.ID)) + if err := allocDir.Build([]*structs.Task{task}); err != nil { + log.Panicf("allocDir.Build() failed: %v", err) + } + + return task.Name, allocDir +} + +func testExecutor(t *testing.T, newExecutor func() Executor, compatible func(*testing.T)) { + if compatible != nil { + compatible(t) + } + + command := func(name string, args ...string) Executor { + b := NewExecutor() + SetCommand(b, name, args) + return b + } + + Executor_Start_Invalid(t, command) + Executor_Start_Wait_Failure_Code(t, command) + Executor_Start_Wait(t, command) + Executor_Start_Kill(t, command) + Executor_Open(t, command) +} + +type buildExecCommand func(name string, args ...string) Executor + +func Executor_Start_Invalid(t *testing.T, command buildExecCommand) { + invalid := "/bin/foobar" + e := command(invalid, "1") + + if err := e.Limit(constraint); err != nil { + log.Panicf("Limit() failed: %v", err) + } + + task, alloc := mockAllocDir(t) + defer alloc.Destroy() + if err := e.ConfigureTaskDir(task, alloc); err != nil { + log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) + } + + if err := e.Start(); err == nil { + log.Panicf("Start(%v) should have failed", invalid) + } +} + +func Executor_Start_Wait_Failure_Code(t *testing.T, command buildExecCommand) { + e := command("/bin/date", "-invalid") + + if err := e.Limit(constraint); err != nil { + log.Panicf("Limit() failed: %v", err) + } + + task, alloc := mockAllocDir(t) + defer alloc.Destroy() + if err := e.ConfigureTaskDir(task, alloc); err != nil { + log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) + } + + if err := e.Start(); err != nil { + log.Panicf("Start() failed: %v", err) + } + + if err := e.Wait(); err == nil { + log.Panicf("Wait() should have failed") + } +} + +func Executor_Start_Wait(t *testing.T, command buildExecCommand) { + task, alloc := mockAllocDir(t) + defer alloc.Destroy() + + taskDir, ok := alloc.TaskDirs[task] + if !ok { + log.Panicf("No task directory found for task %v", task) + } + + expected := "hello world" + file := filepath.Join(allocdir.TaskLocal, "output.txt") + absFilePath := filepath.Join(taskDir, file) + cmd := fmt.Sprintf(`"%v \"%v\" > %v"`, "/bin/sleep 1 ; echo -n", expected, file) + e := command("/bin/bash", "-c", cmd) + + if err := e.Limit(constraint); err != nil { + log.Panicf("Limit() failed: %v", err) + } + + if err := e.ConfigureTaskDir(task, alloc); err != nil { + log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) + } + + if err := e.Start(); err != nil { + log.Panicf("Start() failed: %v", err) + } + + if err := e.Wait(); err != nil { + log.Panicf("Wait() failed: %v", err) + } + + output, err := ioutil.ReadFile(absFilePath) + if err != nil { + log.Panicf("Couldn't read file %v", absFilePath) + } + + act := string(output) + if act != expected { + log.Panicf("Command output incorrectly: want %v; got %v", expected, act) + } +} + +func Executor_Start_Kill(t *testing.T, command buildExecCommand) { + task, alloc := mockAllocDir(t) + defer alloc.Destroy() + + taskDir, ok := alloc.TaskDirs[task] + if !ok { + log.Panicf("No task directory found for task %v", task) + } + + filePath := filepath.Join(taskDir, "output") + e := command("/bin/bash", "-c", "sleep 1 ; echo \"failure\" > "+filePath) + + if err := e.Limit(constraint); err != nil { + log.Panicf("Limit() failed: %v", err) + } + + if err := e.ConfigureTaskDir(task, alloc); err != nil { + log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) + } + + if err := e.Start(); err != nil { + log.Panicf("Start() failed: %v", err) + } + + if err := e.Shutdown(); err != nil { + log.Panicf("Shutdown() failed: %v", err) + } + + time.Sleep(1500 * time.Millisecond) + + // Check that the file doesn't exist. + if _, err := os.Stat(filePath); err == nil { + log.Panicf("Stat(%v) should have failed: task not killed", filePath) + } +} + +func Executor_Open(t *testing.T, command buildExecCommand) { + task, alloc := mockAllocDir(t) + defer alloc.Destroy() + + taskDir, ok := alloc.TaskDirs[task] + if !ok { + log.Panicf("No task directory found for task %v", task) + } + + expected := "hello world" + file := filepath.Join(allocdir.TaskLocal, "output.txt") + absFilePath := filepath.Join(taskDir, file) + cmd := fmt.Sprintf(`"%v \"%v\" > %v"`, "/bin/sleep 1 ; echo -n", expected, file) + e := command("/bin/bash", "-c", cmd) + + if err := e.Limit(constraint); err != nil { + log.Panicf("Limit() failed: %v", err) + } + + if err := e.ConfigureTaskDir(task, alloc); err != nil { + log.Panicf("ConfigureTaskDir(%v, %v) failed: %v", task, alloc, err) + } + + if err := e.Start(); err != nil { + log.Panicf("Start() failed: %v", err) + } + + id, err := e.ID() + if err != nil { + log.Panicf("ID() failed: %v", err) + } + + e2 := NewExecutor() + if err := e2.Open(id); err != nil { + log.Panicf("Open(%v) failed: %v", id, err) + } + + if err := e2.Wait(); err != nil { + log.Panicf("Wait() failed: %v", err) + } + + output, err := ioutil.ReadFile(absFilePath) + if err != nil { + log.Panicf("Couldn't read file %v", absFilePath) + } + + act := string(output) + if act != expected { + log.Panicf("Command output incorrectly: want %v; got %v", expected, act) + } +} diff --git a/client/driver/spawn/spawn.go b/client/driver/spawn/spawn.go index ef160611e..b962a1ab4 100644 --- a/client/driver/spawn/spawn.go +++ b/client/driver/spawn/spawn.go @@ -22,6 +22,7 @@ type Spawner struct { SpawnPid int SpawnPpid int StateFile string + UserPid int // User configuration UserCmd *exec.Cmd @@ -137,6 +138,7 @@ func (s *Spawner) Spawn(cb func(pid int) error) error { if resp.ErrorMsg != "" { return fmt.Errorf("Failed to execute user command: %s", resp.ErrorMsg) } + s.UserPid = resp.UserPID case <-time.After(5 * time.Second): return fmt.Errorf("timed out waiting for response") }